Skip to content

Commit

Permalink
acl: add ACL roles to event stream topic and resolve policies. (#14923)
Browse files Browse the repository at this point in the history
This changes adds ACL role creation and deletion to the event
stream. It is exposed as a single topic with two types; the filter
is primarily the role ID but also includes the role name.

While conducting this work it was also discovered that the events
stream has its own ACL resolution logic. This did not account for
ACL tokens which included role links, or tokens with expiry times.
ACL role links are now resolved to their policies and tokens are
checked for expiry correctly.
  • Loading branch information
jrasell authored Oct 20, 2022
1 parent eaea916 commit 1c9b4e3
Show file tree
Hide file tree
Showing 12 changed files with 725 additions and 39 deletions.
11 changes: 11 additions & 0 deletions .changelog/14923.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
event stream: Resolve ACL roles within ACL tokens
```

```release-note:bug
event stream: Check ACL token expiry when resolving tokens
```

```release-note:improvement
event stream: Added ACL role topic with create and delete types
```
18 changes: 12 additions & 6 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,8 +1366,8 @@ func (a *ACL) ListRoles(
}

// GetRolesByID is used to get a set of ACL Roles as defined by their ID. This
// endpoint is used by the replication process and uses a specific response in
// order to make that process easier.
// endpoint is used by the replication process and Nomad agent client token
// resolution.
func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error {

// This endpoint is only used by the replication process which is only
Expand All @@ -1382,11 +1382,17 @@ func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACL
}
defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now())

// Check that the caller has a management token and that ACLs are enabled
// properly.
if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil {
// For client typed tokens, allow them to query any roles associated with
// that token. This is used by Nomad agents in client mode which are
// resolving the roles to enforce.
token, err := a.requestACLToken(args.AuthToken)
if err != nil {
return err
} else if acl == nil || !acl.IsManagement() {
}
if token == nil {
return structs.ErrTokenNotFound
}
if token.Type != structs.ACLManagementToken && !token.HasRoles(args.ACLRoleIDs) {
return structs.ErrPermissionDenied
}

Expand Down
51 changes: 47 additions & 4 deletions nomad/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ func TestACL_GetRolesByID(t *testing.T) {

// Try reading a role without setting a correct auth token.
aclRoleReq1 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{"nope"},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
},
Expand Down Expand Up @@ -2278,6 +2279,48 @@ func TestACL_GetRolesByID(t *testing.T) {
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID)
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID)

// Create a client token which allows us to test client tokens looking up
// their own role assignments.
clientToken1 := &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Name: "acl-endpoint-test-role",
Type: structs.ACLClientToken,
Roles: []*structs.ACLTokenRoleLink{{ID: aclRoles[0].ID}},
}
clientToken1.SetHash()

require.NoError(t, testServer.fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{clientToken1}))

// Use the client token in an attempt to look up an ACL role which is
// assigned to the token, and therefore should work.
aclRoleReq4 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[0].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: clientToken1.SecretID,
},
}
var aclRoleResp4 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4)
require.NoError(t, err)
require.Len(t, aclRoleResp4.ACLRoles, 1)
require.Contains(t, aclRoleResp4.ACLRoles, aclRoles[0].ID)

// Use the client token in an attempt to look up an ACL role which is NOT
// assigned to the token which should fail.
aclRoleReq5 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: clientToken1.SecretID,
},
}
var aclRoleResp5 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5)
require.ErrorContains(t, err, "Permission denied")

// Now test a blocking query, where we wait for an update to the set which
// is triggered by a deletion.
type res struct {
Expand All @@ -2287,7 +2330,7 @@ func TestACL_GetRolesByID(t *testing.T) {
resultCh := make(chan *res)

go func(resultCh chan *res) {
aclRoleReq5 := &structs.ACLRolesByIDRequest{
aclRoleReq6 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
Expand All @@ -2296,9 +2339,9 @@ func TestACL_GetRolesByID(t *testing.T) {
MaxQueryTime: 10 * time.Second,
},
}
var aclRoleResp5 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5)
resultCh <- &res{err: err, reply: &aclRoleResp5}
var aclRoleResp6 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq6, &aclRoleResp6)
resultCh <- &res{err: err, reply: &aclRoleResp6}
}(resultCh)

// Delete an ACL role from state which should return the blocking query.
Expand Down
28 changes: 28 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.ACLRolesDeleteByIDRequestType: structs.TypeACLRoleDeleted,
structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted,
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
Expand Down Expand Up @@ -77,6 +79,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
ACLPolicy: before,
},
}, true
case TableACLRoles:
before, ok := change.Before.(*structs.ACLRole)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLRole,
Key: before.ID,
FilterKeys: []string{before.Name},
Payload: &structs.ACLRoleStreamEvent{
ACLRole: before,
},
}, true
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
Expand Down Expand Up @@ -136,6 +151,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
ACLPolicy: after,
},
}, true
case TableACLRoles:
after, ok := change.After.(*structs.ACLRole)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLRole,
Key: after.ID,
FilterKeys: []string{after.Name},
Payload: &structs.ACLRoleStreamEvent{
ACLRole: after,
},
}, true
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
Expand Down
53 changes: 53 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,59 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
require.Equal(t, service, eventPayload.Service)
}

func Test_eventsFromChanges_ACLRole(t *testing.T) {
ci.Parallel(t)
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer testState.StopEventBroker()

// Generate a test ACL role.
aclRole := mock.ACLRole()

// Upsert the role into state, skipping the checks perform to ensure the
// linked policies exist.
writeTxn := testState.db.WriteTxn(10)
updated, err := testState.upsertACLRoleTxn(10, writeTxn, aclRole, true)
require.True(t, updated)
require.NoError(t, err)
writeTxn.Txn.Commit()

// Pull the events from the stream.
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLRolesUpsertRequestType}
receivedChange := eventsFromChanges(writeTxn, upsertChange)

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedChange.Events, 1)
require.Equal(t, structs.TopicACLRole, receivedChange.Events[0].Topic)
require.Equal(t, aclRole.ID, receivedChange.Events[0].Key)
require.Equal(t, aclRole.Name, receivedChange.Events[0].FilterKeys[0])
require.Equal(t, structs.TypeACLRoleUpserted, receivedChange.Events[0].Type)
require.Equal(t, uint64(10), receivedChange.Events[0].Index)

eventPayload := receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
require.Equal(t, aclRole, eventPayload.ACLRole)

// Delete the previously upserted ACL role.
deleteTxn := testState.db.WriteTxn(20)
require.NoError(t, testState.deleteACLRoleByIDTxn(deleteTxn, aclRole.ID))
require.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLRoles, 20}))
deleteTxn.Txn.Commit()

// Pull the events from the stream.
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLRolesDeleteByIDRequestType}
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedDeleteChange.Events, 1)
require.Equal(t, structs.TopicACLRole, receivedDeleteChange.Events[0].Topic)
require.Equal(t, aclRole.ID, receivedDeleteChange.Events[0].Key)
require.Equal(t, aclRole.Name, receivedDeleteChange.Events[0].FilterKeys[0])
require.Equal(t, structs.TypeACLRoleDeleted, receivedDeleteChange.Events[0].Type)
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)

eventPayload = receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
require.Equal(t, aclRole, eventPayload.ACLRole)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
34 changes: 22 additions & 12 deletions nomad/state/state_store_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,8 @@ func (s *StateStore) DeleteACLRolesByID(
defer txn.Abort()

for _, roleID := range roleIDs {

existing, err := txn.First(TableACLRoles, indexID, roleID)
if err != nil {
return fmt.Errorf("ACL role lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL role not found")
}

// Delete the existing entry from the table.
if err := txn.Delete(TableACLRoles, existing); err != nil {
return fmt.Errorf("ACL role deletion failed: %v", err)
if err := s.deleteACLRoleByIDTxn(txn, roleID); err != nil {
return err
}
}

Expand All @@ -206,6 +196,26 @@ func (s *StateStore) DeleteACLRolesByID(
return txn.Commit()
}

// deleteACLRoleByIDTxn deletes a single ACL role from the state store using the
// provided write transaction. It is the responsibility of the caller to update
// the index table.
func (s *StateStore) deleteACLRoleByIDTxn(txn *txn, roleID string) error {

existing, err := txn.First(TableACLRoles, indexID, roleID)
if err != nil {
return fmt.Errorf("ACL role lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL role not found")
}

// Delete the existing entry from the table.
if err := txn.Delete(TableACLRoles, existing); err != nil {
return fmt.Errorf("ACL role deletion failed: %v", err)
}
return nil
}

// GetACLRoles returns an iterator that contains all ACL roles stored within
// state.
func (s *StateStore) GetACLRoles(ws memdb.WatchSet) (memdb.ResultIterator, error) {
Expand Down
57 changes: 43 additions & 14 deletions nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -213,22 +214,22 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return !aclAllowsSubscription(aclObj, sub.req)
})

case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent:
// Re-evaluate each subscription permission since a policy or
// role change may alter the permissions of the token being
// used for the subscription.
e.checkSubscriptionsAgainstACLChange()
}
}
}
}

// checkSubscriptionsAgainstPolicyChange iterates over the brokers
// subscriptions and evaluates whether the token used for the subscription is
// still valid. If it is not valid it closes the subscriptions belonging to the
// token.
//
// A lock must be held to iterate over the map of subscriptions.
func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
// checkSubscriptionsAgainstACLChange iterates over the brokers subscriptions
// and evaluates whether the token used for the subscription is still valid. A
// token may become invalid is the assigned policies or roles have been updated
// which removed the required permission. If the token is no long valid, the
// subscription is closed.
func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
e.mu.Lock()
defer e.mu.Unlock()

Expand Down Expand Up @@ -257,22 +258,28 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
}
}

func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
func aclObjFromSnapshotForTokenSecretID(
aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {

aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil {
return nil, err
}

if aclToken == nil {
return nil, errors.New("no token for secret ID")
return nil, structs.ErrTokenNotFound
}
if aclToken.IsExpired(time.Now().UTC()) {
return nil, structs.ErrTokenExpired
}

// Check if this is a management token
if aclToken.Type == structs.ACLManagementToken {
return acl.ManagementACL, nil
}

aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies))
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))

for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil || policy == nil {
Expand All @@ -281,12 +288,34 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *
aclPolicies = append(aclPolicies, policy)
}

// Iterate all the token role links, so we can unpack these and identify
// the ACL policies.
for _, roleLink := range aclToken.Roles {

role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
if err != nil {
return nil, err
}
if role == nil {
continue
}

for _, policyLink := range role.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil || policy == nil {
return nil, errors.New("error finding acl policy")
}
aclPolicies = append(aclPolicies, policy)
}
}

return structs.CompileACLObject(aclCache, aclPolicies)
}

type ACLTokenProvider interface {
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error)
}

type ACLDelegate interface {
Expand Down
Loading

0 comments on commit 1c9b4e3

Please sign in to comment.