Skip to content

Commit

Permalink
Indexing permissions as part of the Elastic Agent policy (#187) (#192)
Browse files Browse the repository at this point in the history
* Indexing permissions as part of the Elastic Agent policy

* Delay the output key generation. Now it is dirven by the policy monitor policy updates.

(cherry picked from commit a743bad)

# Conflicts:
#	cmd/fleet/handleCheckin.go
#	cmd/fleet/handleEnroll.go

Co-authored-by: Aleksandr Maus <[email protected]>
  • Loading branch information
mergify[bot] and aleksmaus authored Mar 30, 2021
1 parent 23bb5b3 commit 2729c2e
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 62 deletions.
67 changes: 56 additions & 11 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro

// Don't log connection drops
if err != context.Canceled {
log.Error().Err(err).Str("id", id).Int("code", code).Msg("Fail checkin")
log.Error().Err(err).Str("id", id).Int("code", code).Msg("fail checkin")
}
http.Error(w, err.Error(), code)
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
return err
}

log.Trace().RawJSON("resp", data).Msg("Checkin response")
log.Trace().RawJSON("resp", data).Msg("checkin response")

return nil
}
Expand All @@ -211,7 +212,7 @@ func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent
sn, err = ct.tr.Resolve(ctx, ackToken)
if err != nil {
if errors.Is(err, dl.ErrNotFound) {
log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("Revision token not found")
log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("revision token not found")
err = nil
} else {
return
Expand Down Expand Up @@ -264,7 +265,8 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model.
// 4) Inject default api key into structure
// 5) Re-serialize and return AgentResp structure

var actionObj map[string]interface{}
// using json.RawMessage to avoid the full json de-serialization
var actionObj map[string]json.RawMessage
if err := json.Unmarshal(p.Data, &actionObj); err != nil {
return nil, err
}
Expand All @@ -276,26 +278,69 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model.
return nil, err
}

// Check if need to generate a new output api key
var (
hash string
needKey bool
roles []byte
)

if agent.DefaultApiKey == "" {
defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, "default")
hash, roles, err = policy.GetRoleDescriptors(actionObj[policy.OutputPermissionsProperty])
if err != nil {
return nil, err
}
needKey = true
log.Debug().Str("agentId", agentId).Msg("agent API key is not present")
} else {
hash, roles, needKey, err = policy.CheckOutputPermissionsChanged(agent.PolicyOutputPermissionsHash, actionObj[policy.OutputPermissionsProperty])
if err != nil {
return nil, err
}
if needKey {
log.Debug().Str("agentId", agentId).Msg("policy output permissions changed")
} else {
log.Debug().Str("agentId", agentId).Msg("policy output permissions are the same")
}
}

if needKey {
log.Debug().Str("agentId", agentId).RawJSON("roles", roles).Str("hash", hash).Msg("generating a new API key")
defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, policy.DefaultOutputName, roles)
if err != nil {
return nil, err
}
agent.DefaultApiKey = defaultOutputApiKey.Agent()
agent.DefaultApiKeyId = defaultOutputApiKey.Id
agent.PolicyOutputPermissionsHash = hash

log.Info().Str("agentId", agentId).Msg("Rewriting full agent record to pick up default output key.")
log.Info().Str("agentId", agentId).Msg("rewriting full agent record to pick up default output key.")
if err = dl.IndexAgent(ctx, bulker, agent); err != nil {
return nil, err
}
}

if ok := setMapObj(actionObj, agent.DefaultApiKey, "outputs", "default", "api_key"); !ok {
log.Debug().Msg("Cannot inject api_key into policy")
// Parse the outputs maps in order to inject the api key
const outputsProperty = "outputs"
outputs, err := smap.Parse(actionObj[outputsProperty])
if err != nil {
return nil, err
}

if outputs != nil {
if ok := setMapObj(outputs, agent.DefaultApiKey, "default", "api_key"); !ok {
log.Debug().Msg("cannot inject api_key into policy")
} else {
outputRaw, err := json.Marshal(outputs)
if err != nil {
return nil, err
}
actionObj[outputsProperty] = json.RawMessage(outputRaw)
}
}

dataJSON, err := json.Marshal(struct {
Policy map[string]interface{} `json:"policy"`
Policy map[string]json.RawMessage `json:"policy"`
}{actionObj})
if err != nil {
return nil, err
Expand Down Expand Up @@ -349,7 +394,7 @@ func findAgentByApiKeyId(ctx context.Context, bulker bulk.Bulk, id string) (*mod
func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err error) {
// Quick comparison first
if bytes.Equal(req.LocalMeta, agent.LocalMetadata) {
log.Trace().Msg("Quick comparing local metadata is equal")
log.Trace().Msg("quick comparing local metadata is equal")
return nil, nil
}

Expand All @@ -366,7 +411,7 @@ func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err erro
}

if reqLocalMeta != nil && !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) {
log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("Applying new local metadata")
log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("applying new local metadata")
fields = map[string]interface{}{
FieldLocalMetadata: req.LocalMeta,
}
Expand Down
32 changes: 9 additions & 23 deletions cmd/fleet/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,34 +180,20 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq
return nil, err
}

defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agentId, "default")
if err != nil {
return nil, err
}

log.Debug().
Dur("rtt", time.Since(now)).
Str("agentId", agentId).
Str("accessApiKey.Id", accessApiKey.Id).
Str("defaultOutputApiKey.Id", defaultOutputApiKey.Id).
Msg("Created api key")

// Update the local metadata agent id
localMeta, err := updateLocalMetaAgentId(req.Meta.Local, agentId)
if err != nil {
return nil, err
}

agentData := model.Agent{
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessApiKeyId: accessApiKey.Id,
DefaultApiKeyId: defaultOutputApiKey.Id,
DefaultApiKey: defaultOutputApiKey.Agent(),
ActionSeqNo: []int64{sqn.UndefinedSeqNo},
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessApiKeyId: accessApiKey.Id,
ActionSeqNo: []int64{sqn.UndefinedSeqNo},
}

err = createFleetAgent(ctx, bulker, agentId, agentData)
Expand Down Expand Up @@ -310,9 +296,9 @@ func generateAccessApiKey(ctx context.Context, client *elasticsearch.Client, age
return apikey.Create(ctx, client, agentId, "", []byte(kFleetAccessRolesJSON))
}

func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId string, outputName string) (*apikey.ApiKey, error) {
func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId, outputName string, roles []byte) (*apikey.ApiKey, error) {
name := fmt.Sprintf("%s:%s", agentId, outputName)
return apikey.Create(ctx, client, name, "", []byte(kFleetOutputRolesJSON))
return apikey.Create(ctx, client, name, "", roles)
}

func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*model.EnrollmentApiKey, error) {
Expand Down
20 changes: 0 additions & 20 deletions cmd/fleet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,6 @@ const kFleetAccessRolesJSON = `
}
`

const kFleetOutputRolesJSON = `
{
"fleet-output": {
"cluster": ["monitor"],
"index": [{
"names": [
"logs-*",
"metrics-*",
"traces-*",
".logs-endpoint.diagnostic.collection-*"
],
"privileges": [
"auto_configure",
"create_doc"
]
}]
}
}
`

// Wrong: no AAD;
// This defeats the signature check;
// can copy from one to another and will dispatch.
Expand Down
2 changes: 1 addition & 1 deletion fleet-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fleet:
agent:
id: 1e4954ce-af37-4731-9f4a-407b08e69e42
logging:
level: '${LOG_LEVEL:INFO}'
level: '${LOG_LEVEL:DEBUG}'

# Input config provided by the Elastic Agent for the server
#inputs:
Expand Down
25 changes: 18 additions & 7 deletions internal/pkg/dl/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"context"
"encoding/json"
"errors"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"sync"

"github.com/elastic/fleet-server/v7/internal/pkg/dsl"
)

var (
tmplQueryLatestPolicies []byte
initQueryLatestPoliciesOnce sync.Once
tmplQueryLatestPolicies = prepareQueryLatestPolicies()

queryPolicyByID = preparePolicyFindByID()
)

var ErrPolicyLeaderNotFound = errors.New("policy has no leader")
Expand All @@ -36,12 +37,22 @@ func prepareQueryLatestPolicies() []byte {
return root.MustMarshalJSON()
}

func preparePolicyFindByID() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()

root.Size(1)
root.Query().Bool().Filter().Term(FieldPolicyId, tmpl.Bind(FieldPolicyId), nil)
sort := root.Sort()
sort.SortOrder(FieldRevisionIdx, dsl.SortDescend)
sort.SortOrder(FieldCoordinatorIdx, dsl.SortDescend)

tmpl.MustResolve(root)
return tmpl
}

// QueryLatestPolices gets the latest revision for a policy
func QueryLatestPolicies(ctx context.Context, bulker bulk.Bulk, opt ...Option) ([]model.Policy, error) {
initQueryLatestPoliciesOnce.Do(func() {
tmplQueryLatestPolicies = prepareQueryLatestPolicies()
})

o := newOption(FleetPolicies, opt...)
res, err := bulker.Search(ctx, []string{o.indexName}, tmplQueryLatestPolicies)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/es/mapping.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions internal/pkg/policy/output_permissions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package policy

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"

"github.com/elastic/fleet-server/v7/internal/pkg/smap"
)

const (
DefaultOutputName = "default"
OutputPermissionsProperty = "output_permissions"
)

var (
ErrOutputPermissionsNotFound = errors.New("output_permissions not found")
ErrDefaultOutputNotFound = errors.New("default output not found")
ErrInvalidPermissionsFormat = errors.New("invalid permissions format")
)

func GetRoleDescriptors(outputPermissionsRaw []byte) (hash string, roles []byte, err error) {
if len(outputPermissionsRaw) == 0 {
return
}

output, err := getDefaultOutputMap(outputPermissionsRaw)
if err != nil {
return
}

// Calculating the hash of the original output map
hash, err = output.Hash()
if err != nil {
return
}

roles, err = json.Marshal(output)
if err != nil {
return
}

return
}

func CheckOutputPermissionsChanged(hash string, outputPermissionsRaw []byte) (newHash string, roles []byte, changed bool, err error) {
if len(outputPermissionsRaw) == 0 {
return
}

// shotcuircut, hash and compare as is, if equals the json is serialized consistently from jsacascript and go
newHash, err = getDefaultOutputHash(outputPermissionsRaw)
if err != nil {
return
}
if hash == newHash {
return hash, nil, false, nil
}

newHash, roles, err = GetRoleDescriptors(outputPermissionsRaw)
if err != nil {
return
}

return newHash, roles, (newHash != hash), nil
}

func getDefaultOutputHash(outputPermissionsRaw []byte) (hash string, err error) {
var m map[string]json.RawMessage
err = json.Unmarshal(outputPermissionsRaw, &m)
if err != nil {
return
}

if len(m[DefaultOutputName]) == 0 {
return
}

b := sha256.Sum256(m[DefaultOutputName])
return hex.EncodeToString(b[:]), nil
}

func getDefaultOutputMap(outputPermissionsRaw []byte) (defaultOutput smap.Map, err error) {
outputPermissions, err := smap.Parse(outputPermissionsRaw)
if err != nil {
return
}

defaultOutput = outputPermissions.GetMap(DefaultOutputName)
if defaultOutput == nil {
err = ErrDefaultOutputNotFound
}
return
}
Loading

0 comments on commit 2729c2e

Please sign in to comment.