From 1c9b4e398dd2b954dc0414b7c1568f04a3143355 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 20 Oct 2022 09:43:35 +0200 Subject: [PATCH] acl: add ACL roles to event stream topic and resolve policies. (#14923) This changes adds ACL role creation and deletion to the event stream. It is exposed as a single topic with two types; the filter is primarily the role ID but also includes the role name. While conducting this work it was also discovered that the events stream has its own ACL resolution logic. This did not account for ACL tokens which included role links, or tokens with expiry times. ACL role links are now resolved to their policies and tokens are checked for expiry correctly. --- .changelog/14923.txt | 11 + nomad/acl_endpoint.go | 18 +- nomad/acl_endpoint_test.go | 51 +++- nomad/state/events.go | 28 ++ nomad/state/events_test.go | 53 ++++ nomad/state/state_store_acl.go | 34 ++- nomad/stream/event_broker.go | 57 +++- nomad/stream/event_broker_test.go | 411 +++++++++++++++++++++++++++- nomad/structs/acl.go | 19 ++ nomad/structs/acl_test.go | 69 +++++ nomad/structs/event.go | 9 + website/content/api-docs/events.mdx | 4 + 12 files changed, 725 insertions(+), 39 deletions(-) create mode 100644 .changelog/14923.txt diff --git a/.changelog/14923.txt b/.changelog/14923.txt new file mode 100644 index 00000000000..e63a41619c2 --- /dev/null +++ b/.changelog/14923.txt @@ -0,0 +1,11 @@ +```release-note:bug +event stream: Resolve ACL roles within ACL tokens +``` + +```release-note:bug +event stream: Check ACL token expiry when resolving tokens +``` + +```release-note:improvement +event stream: Added ACL role topic with create and delete types +``` diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 21c487ec533..c95629faa1f 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -1366,8 +1366,8 @@ func (a *ACL) ListRoles( } // GetRolesByID is used to get a set of ACL Roles as defined by their ID. This -// endpoint is used by the replication process and uses a specific response in -// order to make that process easier. +// endpoint is used by the replication process and Nomad agent client token +// resolution. func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error { // This endpoint is only used by the replication process which is only @@ -1382,11 +1382,17 @@ func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACL } defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now()) - // Check that the caller has a management token and that ACLs are enabled - // properly. - if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil { + // For client typed tokens, allow them to query any roles associated with + // that token. This is used by Nomad agents in client mode which are + // resolving the roles to enforce. + token, err := a.requestACLToken(args.AuthToken) + if err != nil { return err - } else if acl == nil || !acl.IsManagement() { + } + if token == nil { + return structs.ErrTokenNotFound + } + if token.Type != structs.ACLManagementToken && !token.HasRoles(args.ACLRoleIDs) { return structs.ErrPermissionDenied } diff --git a/nomad/acl_endpoint_test.go b/nomad/acl_endpoint_test.go index 67d2c3f2922..42bddfc0d35 100644 --- a/nomad/acl_endpoint_test.go +++ b/nomad/acl_endpoint_test.go @@ -2228,6 +2228,7 @@ func TestACL_GetRolesByID(t *testing.T) { // Try reading a role without setting a correct auth token. aclRoleReq1 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{"nope"}, QueryOptions: structs.QueryOptions{ Region: DefaultRegion, }, @@ -2278,6 +2279,48 @@ func TestACL_GetRolesByID(t *testing.T) { require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID) require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID) + // Create a client token which allows us to test client tokens looking up + // their own role assignments. + clientToken1 := &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Name: "acl-endpoint-test-role", + Type: structs.ACLClientToken, + Roles: []*structs.ACLTokenRoleLink{{ID: aclRoles[0].ID}}, + } + clientToken1.SetHash() + + require.NoError(t, testServer.fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{clientToken1})) + + // Use the client token in an attempt to look up an ACL role which is + // assigned to the token, and therefore should work. + aclRoleReq4 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[0].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: clientToken1.SecretID, + }, + } + var aclRoleResp4 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4) + require.NoError(t, err) + require.Len(t, aclRoleResp4.ACLRoles, 1) + require.Contains(t, aclRoleResp4.ACLRoles, aclRoles[0].ID) + + // Use the client token in an attempt to look up an ACL role which is NOT + // assigned to the token which should fail. + aclRoleReq5 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[1].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: clientToken1.SecretID, + }, + } + var aclRoleResp5 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5) + require.ErrorContains(t, err, "Permission denied") + // Now test a blocking query, where we wait for an update to the set which // is triggered by a deletion. type res struct { @@ -2287,7 +2330,7 @@ func TestACL_GetRolesByID(t *testing.T) { resultCh := make(chan *res) go func(resultCh chan *res) { - aclRoleReq5 := &structs.ACLRolesByIDRequest{ + aclRoleReq6 := &structs.ACLRolesByIDRequest{ ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID}, QueryOptions: structs.QueryOptions{ Region: DefaultRegion, @@ -2296,9 +2339,9 @@ func TestACL_GetRolesByID(t *testing.T) { MaxQueryTime: 10 * time.Second, }, } - var aclRoleResp5 structs.ACLRolesByIDResponse - err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5) - resultCh <- &res{err: err, reply: &aclRoleResp5} + var aclRoleResp6 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq6, &aclRoleResp6) + resultCh <- &res{err: err, reply: &aclRoleResp6} }(resultCh) // Delete an ACL role from state which should return the blocking query. diff --git a/nomad/state/events.go b/nomad/state/events.go index fc448babde4..675b02a5b20 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -28,6 +28,8 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted, structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted, structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted, + structs.ACLRolesDeleteByIDRequestType: structs.TypeACLRoleDeleted, + structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted, structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration, structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration, structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, @@ -77,6 +79,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLPolicy: before, }, }, true + case TableACLRoles: + before, ok := change.Before.(*structs.ACLRole) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLRole, + Key: before.ID, + FilterKeys: []string{before.Name}, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: before, + }, + }, true case "nodes": before, ok := change.Before.(*structs.Node) if !ok { @@ -136,6 +151,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLPolicy: after, }, }, true + case TableACLRoles: + after, ok := change.After.(*structs.ACLRole) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLRole, + Key: after.ID, + FilterKeys: []string{after.Name}, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: after, + }, + }, true case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 44a9c326ef1..a1f7902a297 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -1002,6 +1002,59 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) { require.Equal(t, service, eventPayload.Service) } +func Test_eventsFromChanges_ACLRole(t *testing.T) { + ci.Parallel(t) + testState := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer testState.StopEventBroker() + + // Generate a test ACL role. + aclRole := mock.ACLRole() + + // Upsert the role into state, skipping the checks perform to ensure the + // linked policies exist. + writeTxn := testState.db.WriteTxn(10) + updated, err := testState.upsertACLRoleTxn(10, writeTxn, aclRole, true) + require.True(t, updated) + require.NoError(t, err) + writeTxn.Txn.Commit() + + // Pull the events from the stream. + upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLRolesUpsertRequestType} + receivedChange := eventsFromChanges(writeTxn, upsertChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedChange.Events, 1) + require.Equal(t, structs.TopicACLRole, receivedChange.Events[0].Topic) + require.Equal(t, aclRole.ID, receivedChange.Events[0].Key) + require.Equal(t, aclRole.Name, receivedChange.Events[0].FilterKeys[0]) + require.Equal(t, structs.TypeACLRoleUpserted, receivedChange.Events[0].Type) + require.Equal(t, uint64(10), receivedChange.Events[0].Index) + + eventPayload := receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent) + require.Equal(t, aclRole, eventPayload.ACLRole) + + // Delete the previously upserted ACL role. + deleteTxn := testState.db.WriteTxn(20) + require.NoError(t, testState.deleteACLRoleByIDTxn(deleteTxn, aclRole.ID)) + require.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLRoles, 20})) + deleteTxn.Txn.Commit() + + // Pull the events from the stream. + deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLRolesDeleteByIDRequestType} + receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedDeleteChange.Events, 1) + require.Equal(t, structs.TopicACLRole, receivedDeleteChange.Events[0].Topic) + require.Equal(t, aclRole.ID, receivedDeleteChange.Events[0].Key) + require.Equal(t, aclRole.Name, receivedDeleteChange.Events[0].FilterKeys[0]) + require.Equal(t, structs.TypeACLRoleDeleted, receivedDeleteChange.Events[0].Type) + require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index) + + eventPayload = receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent) + require.Equal(t, aclRole, eventPayload.ACLRole) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/state/state_store_acl.go b/nomad/state/state_store_acl.go index 55891e25485..790bc292b22 100644 --- a/nomad/state/state_store_acl.go +++ b/nomad/state/state_store_acl.go @@ -183,18 +183,8 @@ func (s *StateStore) DeleteACLRolesByID( defer txn.Abort() for _, roleID := range roleIDs { - - existing, err := txn.First(TableACLRoles, indexID, roleID) - if err != nil { - return fmt.Errorf("ACL role lookup failed: %v", err) - } - if existing == nil { - return errors.New("ACL role not found") - } - - // Delete the existing entry from the table. - if err := txn.Delete(TableACLRoles, existing); err != nil { - return fmt.Errorf("ACL role deletion failed: %v", err) + if err := s.deleteACLRoleByIDTxn(txn, roleID); err != nil { + return err } } @@ -206,6 +196,26 @@ func (s *StateStore) DeleteACLRolesByID( return txn.Commit() } +// deleteACLRoleByIDTxn deletes a single ACL role from the state store using the +// provided write transaction. It is the responsibility of the caller to update +// the index table. +func (s *StateStore) deleteACLRoleByIDTxn(txn *txn, roleID string) error { + + existing, err := txn.First(TableACLRoles, indexID, roleID) + if err != nil { + return fmt.Errorf("ACL role lookup failed: %v", err) + } + if existing == nil { + return errors.New("ACL role not found") + } + + // Delete the existing entry from the table. + if err := txn.Delete(TableACLRoles, existing); err != nil { + return fmt.Errorf("ACL role deletion failed: %v", err) + } + return nil +} + // GetACLRoles returns an iterator that contains all ACL roles stored within // state. func (s *StateStore) GetACLRoles(ws memdb.WatchSet) (memdb.ResultIterator, error) { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index da5f8f0abae..179ffbce629 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" @@ -213,22 +214,22 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return !aclAllowsSubscription(aclObj, sub.req) }) - case *structs.ACLPolicyEvent: - // Re-evaluate each subscriptions permissions since a policy - // change may or may not affect the subscription - e.checkSubscriptionsAgainstPolicyChange() + case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent: + // Re-evaluate each subscription permission since a policy or + // role change may alter the permissions of the token being + // used for the subscription. + e.checkSubscriptionsAgainstACLChange() } } } } -// checkSubscriptionsAgainstPolicyChange iterates over the brokers -// subscriptions and evaluates whether the token used for the subscription is -// still valid. If it is not valid it closes the subscriptions belonging to the -// token. -// -// A lock must be held to iterate over the map of subscriptions. -func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { +// checkSubscriptionsAgainstACLChange iterates over the brokers subscriptions +// and evaluates whether the token used for the subscription is still valid. A +// token may become invalid is the assigned policies or roles have been updated +// which removed the required permission. If the token is no long valid, the +// subscription is closed. +func (e *EventBroker) checkSubscriptionsAgainstACLChange() { e.mu.Lock() defer e.mu.Unlock() @@ -257,14 +258,19 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { } } -func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { +func aclObjFromSnapshotForTokenSecretID( + aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { + aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) if err != nil { return nil, err } if aclToken == nil { - return nil, errors.New("no token for secret ID") + return nil, structs.ErrTokenNotFound + } + if aclToken.IsExpired(time.Now().UTC()) { + return nil, structs.ErrTokenExpired } // Check if this is a management token @@ -272,7 +278,8 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache * return acl.ManagementACL, nil } - aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)) + aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) + for _, policyName := range aclToken.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) if err != nil || policy == nil { @@ -281,12 +288,34 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache * aclPolicies = append(aclPolicies, policy) } + // Iterate all the token role links, so we can unpack these and identify + // the ACL policies. + for _, roleLink := range aclToken.Roles { + + role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) + if err != nil { + return nil, err + } + if role == nil { + continue + } + + for _, policyLink := range role.Policies { + policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) + if err != nil || policy == nil { + return nil, errors.New("error finding acl policy") + } + aclPolicies = append(aclPolicies, policy) + } + } + return structs.CompileACLObject(aclCache, aclPolicies) } type ACLTokenProvider interface { ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) + GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error) } type ACLDelegate interface { diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index ea7457aa137..5ccf24cea0e 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -9,6 +9,8 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -174,17 +176,23 @@ type fakeACLTokenProvider struct { policyErr error token *structs.ACLToken tokenErr error + role *structs.ACLRole + roleErr error } -func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) { +func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) { return p.token, p.tokenErr } -func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) { +func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) { return p.policy, p.policyErr } -func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { +func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) { + return p.role, p.roleErr +} + +func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { ci.Parallel(t) ctx, cancel := context.WithCancel(context.Background()) @@ -578,6 +586,403 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { } } +func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) { + ci.Parallel(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Generate a UUID to use in all tests for the token secret ID and the role + // ID. + tokenSecretID := uuid.Generate() + roleID := uuid.Generate() + + cases := []struct { + name string + aclPolicy *structs.ACLPolicy + roleBeforePolicyLinks []*structs.ACLRolePolicyLink + roleAfterPolicyLinks []*structs.ACLRolePolicyLink + topics map[structs.Topic][]string + event structs.Event + policyEvent structs.Event + shouldUnsubscribe bool + initialSubErr bool + }{ + { + name: "deployments access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicDeployment, + Type: structs.TypeDeploymentUpdate, + Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "evaluations access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicEvaluation, + Type: structs.TypeEvalUpdated, + Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "allocations access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicAllocation, + Type: structs.TypeAllocationUpdated, + Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "nodes access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NodePolicy(acl.PolicyRead), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "deployment access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicDeployment, + Type: structs.TypeDeploymentUpdate, + Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "evaluations access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicEvaluation, + Type: structs.TypeEvalUpdated, + Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "allocations access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicAllocation, + Type: structs.TypeAllocationUpdated, + Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "nodes access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NodePolicy(acl.PolicyRead), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + // Build our fake token provider containing the relevant state + // objects and add this to our new delegate. Keeping the token + // provider setup separate means we can easily update its state. + tokenProvider := &fakeACLTokenProvider{ + policy: tc.aclPolicy, + token: &structs.ACLToken{ + SecretID: tokenSecretID, + Roles: []*structs.ACLTokenRoleLink{{ID: roleID}}, + }, + role: &structs.ACLRole{ + ID: uuid.Short(), + Policies: []*structs.ACLRolePolicyLink{ + {Name: tc.aclPolicy.Name}, + }, + }, + } + aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + + publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) + require.NoError(t, err) + + ns := structs.DefaultNamespace + if tc.event.Namespace != "" { + ns = tc.event.Namespace + } + + sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, + Namespace: ns, + Token: tokenSecretID, + }) + + if tc.initialSubErr { + require.Error(t, err) + require.Nil(t, sub) + return + } + + require.NoError(t, err) + publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}}) + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + require.NoError(t, err) + + // Overwrite the ACL role policy links with the updated version + // which is expected to cause a change in the subscription. + tokenProvider.role.Policies = tc.roleAfterPolicyLinks + + // Publish ACL event triggering subscription re-evaluation + publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}}) + publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}}) + + // If we are expecting to unsubscribe consume the subscription + // until the expected error occurs. + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + if tc.shouldUnsubscribe { + for { + _, err = sub.Next(ctx) + if err != nil { + if err == context.DeadlineExceeded { + require.Fail(t, err.Error()) + } + if err == ErrSubscriptionClosed { + break + } + } + } + } else { + _, err = sub.Next(ctx) + require.NoError(t, err) + } + + publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}}) + + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + if tc.shouldUnsubscribe { + require.Equal(t, ErrSubscriptionClosed, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { + ci.Parallel(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cases := []struct { + name string + inputToken *structs.ACLToken + shouldExpire bool + }{ + { + name: "token does not expire", + inputToken: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), + Type: structs.ACLManagementToken, + }, + shouldExpire: false, + }, + { + name: "token does expire", + inputToken: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), + Type: structs.ACLManagementToken, + }, + shouldExpire: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + // Build our fake token provider containing the relevant state + // objects and add this to our new delegate. Keeping the token + // provider setup separate means we can easily update its state. + tokenProvider := &fakeACLTokenProvider{token: tc.inputToken} + aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + + publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) + require.NoError(t, err) + + fakeNodeEvent := structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + } + + fakeTokenEvent := structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), + } + + sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, + Token: tc.inputToken.SecretID, + }) + require.NoError(t, err) + require.NotNil(t, sub) + + // Publish an event and check that there is a new item in the + // subscription queue. + publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}}) + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + require.NoError(t, err) + + // If the test states the token should expire, set the expiration + // time to a previous time. + if tc.shouldExpire { + tokenProvider.token.ExpirationTime = pointer.Of( + time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC), + ) + } + + // Publish some events to trigger re-evaluation of the subscription. + publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}}) + publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}}) + + // If we are expecting to unsubscribe consume the subscription + // until the expected error occurs. + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + + if tc.shouldExpire { + for { + if _, err = sub.Next(ctx); err != nil { + if err == context.DeadlineExceeded { + require.Fail(t, err.Error()) + } + if err == ErrSubscriptionClosed { + break + } + } + } + } else { + _, err = sub.Next(ctx) + require.NoError(t, err) + } + }) + } +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/structs/acl.go b/nomad/structs/acl.go index 3c52e7014a7..f33bd708e83 100644 --- a/nomad/structs/acl.go +++ b/nomad/structs/acl.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "golang.org/x/crypto/blake2b" @@ -232,6 +233,24 @@ func (a *ACLToken) IsExpired(t time.Time) bool { return a.ExpirationTime.Before(t) || t.IsZero() } +// HasRoles checks if a given set of role IDs are assigned to the ACL token. It +// does not account for management tokens, therefore it is the responsibility +// of the caller to perform this check, if required. +func (a *ACLToken) HasRoles(roleIDs []string) bool { + + // Generate a set of role IDs that the token is assigned. + roleSet := set.FromFunc(a.Roles, func(roleLink *ACLTokenRoleLink) string { return roleLink.ID }) + + // Iterate the role IDs within the request and check whether these are + // present within the token assignment. + for _, roleID := range roleIDs { + if !roleSet.Contains(roleID) { + return false + } + } + return true +} + // ACLRole is an abstraction for the ACL system which allows the grouping of // ACL policies into a single object. ACL tokens can be created and linked to // a role; the token then inherits all the permissions granted by the policies. diff --git a/nomad/structs/acl_test.go b/nomad/structs/acl_test.go index d39ff2ae797..05a497efd93 100644 --- a/nomad/structs/acl_test.go +++ b/nomad/structs/acl_test.go @@ -297,6 +297,75 @@ func TestACLToken_IsExpired(t *testing.T) { } } +func TestACLToken_HasRoles(t *testing.T) { + testCases := []struct { + name string + inputToken *ACLToken + inputRoleIDs []string + expectedOutput bool + }{ + { + name: "client token request all subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"foo", "bar", "baz"}, + expectedOutput: true, + }, + { + name: "client token request partial subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"foo", "baz"}, + expectedOutput: true, + }, + { + name: "client token request one subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"baz"}, + expectedOutput: true, + }, + { + name: "client token request no subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"new"}, + expectedOutput: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputToken.HasRoles(tc.inputRoleIDs) + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} + func TestACLRole_SetHash(t *testing.T) { testCases := []struct { name string diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 5b55a4612bd..34c6ad981e5 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -23,6 +23,7 @@ const ( TopicNode Topic = "Node" TopicACLPolicy Topic = "ACLPolicy" TopicACLToken Topic = "ACLToken" + TopicACLRole Topic = "ACLRole" TopicService Topic = "Service" TopicAll Topic = "*" @@ -46,6 +47,8 @@ const ( TypeACLTokenUpserted = "ACLTokenUpserted" TypeACLPolicyDeleted = "ACLPolicyDeleted" TypeACLPolicyUpserted = "ACLPolicyUpserted" + TypeACLRoleDeleted = "ACLRoleDeleted" + TypeACLRoleUpserted = "ACLRoleUpserted" TypeServiceRegistration = "ServiceRegistration" TypeServiceDeregistration = "ServiceDeregistration" ) @@ -151,3 +154,9 @@ func (a *ACLTokenEvent) SecretID() string { type ACLPolicyEvent struct { ACLPolicy *ACLPolicy } + +// ACLRoleStreamEvent holds a newly updated or delete ACL role to be used as an +// event within the event stream. +type ACLRoleStreamEvent struct { + ACLRole *ACLRole +} diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index 7a9b5bef05c..de6f1cae716 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -33,6 +33,7 @@ by default, requiring a management token. | `*` | `management` | | `ACLToken` | `management` | | `ACLPolicy` | `management` | +| `ACLRole` | `management` | | `Job` | `namespace:read-job` | | `Allocation` | `namespace:read-job` | | `Deployment` | `namespace:read-job` | @@ -67,6 +68,7 @@ by default, requiring a management token. | ---------- | ------------------------------- | | ACLToken | ACLToken | | ACLPolicy | ACLPolicy | +| ACLRoles | ACLRole | | Allocation | Allocation (no job information) | | Job | Job | | Evaluation | Evaluation | @@ -83,6 +85,8 @@ by default, requiring a management token. | ACLTokenDeleted | | ACLPolicyUpserted | | ACLPolicyDeleted | +| ACLRoleUpserted | +| ACLRoleDeleted | | AllocationCreated | | AllocationUpdated | | AllocationUpdateDesiredStatus |