Skip to content

Commit

Permalink
Indexing permissions as part of the Elastic Agent policy (#187)
Browse files Browse the repository at this point in the history
* Indexing permissions as part of the Elastic Agent policy

* Delay the output key generation. Now it is dirven by the policy monitor policy updates.

(cherry picked from commit a743bad)

# Conflicts:
#	cmd/fleet/handleCheckin.go
#	cmd/fleet/handleEnroll.go
  • Loading branch information
aleksmaus authored and mergify-bot committed Mar 30, 2021
1 parent 066cbb9 commit d5ca9d9
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 53 deletions.
71 changes: 60 additions & 11 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
<<<<<<< HEAD
=======
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
>>>>>>> a743bad... Indexing permissions as part of the Elastic Agent policy (#187)

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog/log"
Expand All @@ -47,7 +52,7 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro

// Don't log connection drops
if err != context.Canceled {
log.Error().Err(err).Str("id", id).Int("code", code).Msg("Fail checkin")
log.Error().Err(err).Str("id", id).Int("code", code).Msg("fail checkin")
}
http.Error(w, err.Error(), code)
}
Expand Down Expand Up @@ -194,7 +199,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
return err
}

log.Trace().RawJSON("resp", data).Msg("Checkin response")
log.Trace().RawJSON("resp", data).Msg("checkin response")

return nil
}
Expand All @@ -210,7 +215,7 @@ func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent
sn, err = ct.tr.Resolve(ctx, ackToken)
if err != nil {
if errors.Is(err, dl.ErrNotFound) {
log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("Revision token not found")
log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("revision token not found")
err = nil
} else {
return
Expand Down Expand Up @@ -263,7 +268,8 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model.
// 4) Inject default api key into structure
// 5) Re-serialize and return AgentResp structure

var actionObj map[string]interface{}
// using json.RawMessage to avoid the full json de-serialization
var actionObj map[string]json.RawMessage
if err := json.Unmarshal(p.Data, &actionObj); err != nil {
return nil, err
}
Expand All @@ -275,26 +281,69 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model.
return nil, err
}

// Check if need to generate a new output api key
var (
hash string
needKey bool
roles []byte
)

if agent.DefaultApiKey == "" {
defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, "default")
hash, roles, err = policy.GetRoleDescriptors(actionObj[policy.OutputPermissionsProperty])
if err != nil {
return nil, err
}
needKey = true
log.Debug().Str("agentId", agentId).Msg("agent API key is not present")
} else {
hash, roles, needKey, err = policy.CheckOutputPermissionsChanged(agent.PolicyOutputPermissionsHash, actionObj[policy.OutputPermissionsProperty])
if err != nil {
return nil, err
}
if needKey {
log.Debug().Str("agentId", agentId).Msg("policy output permissions changed")
} else {
log.Debug().Str("agentId", agentId).Msg("policy output permissions are the same")
}
}

if needKey {
log.Debug().Str("agentId", agentId).RawJSON("roles", roles).Str("hash", hash).Msg("generating a new API key")
defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, policy.DefaultOutputName, roles)
if err != nil {
return nil, err
}
agent.DefaultApiKey = defaultOutputApiKey.Agent()
agent.DefaultApiKeyId = defaultOutputApiKey.Id
agent.PolicyOutputPermissionsHash = hash

log.Info().Str("agentId", agentId).Msg("Rewriting full agent record to pick up default output key.")
log.Info().Str("agentId", agentId).Msg("rewriting full agent record to pick up default output key.")
if err = dl.IndexAgent(ctx, bulker, agent); err != nil {
return nil, err
}
}

if ok := setMapObj(actionObj, agent.DefaultApiKey, "outputs", "default", "api_key"); !ok {
log.Debug().Msg("Cannot inject api_key into policy")
// Parse the outputs maps in order to inject the api key
const outputsProperty = "outputs"
outputs, err := smap.Parse(actionObj[outputsProperty])
if err != nil {
return nil, err
}

if outputs != nil {
if ok := setMapObj(outputs, agent.DefaultApiKey, "default", "api_key"); !ok {
log.Debug().Msg("cannot inject api_key into policy")
} else {
outputRaw, err := json.Marshal(outputs)
if err != nil {
return nil, err
}
actionObj[outputsProperty] = json.RawMessage(outputRaw)
}
}

dataJSON, err := json.Marshal(struct {
Policy map[string]interface{} `json:"policy"`
Policy map[string]json.RawMessage `json:"policy"`
}{actionObj})
if err != nil {
return nil, err
Expand Down Expand Up @@ -348,7 +397,7 @@ func findAgentByApiKeyId(ctx context.Context, bulker bulk.Bulk, id string) (*mod
func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err error) {
// Quick comparison first
if bytes.Equal(req.LocalMeta, agent.LocalMetadata) {
log.Trace().Msg("Quick comparing local metadata is equal")
log.Trace().Msg("quick comparing local metadata is equal")
return nil, nil
}

Expand All @@ -365,7 +414,7 @@ func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err erro
}

if reqLocalMeta != nil && !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) {
log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("Applying new local metadata")
log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("applying new local metadata")
fields = map[string]interface{}{
FieldLocalMetadata: req.LocalMeta,
}
Expand Down
26 changes: 12 additions & 14 deletions cmd/fleet/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,25 +179,14 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq
return nil, err
}

defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agentId, "default")
if err != nil {
return nil, err
}

log.Debug().
Dur("rtt", time.Since(now)).
Str("agentId", agentId).
Str("accessApiKey.Id", accessApiKey.Id).
Str("defaultOutputApiKey.Id", defaultOutputApiKey.Id).
Msg("Created api key")

// Update the local metadata agent id
localMeta, err := updateLocalMetaAgentId(req.Meta.Local, agentId)
if err != nil {
return nil, err
}

agentData := model.Agent{
<<<<<<< HEAD
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
Expand All @@ -207,6 +196,15 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq
DefaultApiKeyId: defaultOutputApiKey.Id,
DefaultApiKey: defaultOutputApiKey.Agent(),
ActionSeqNo: dl.UndefinedSeqNo,
=======
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessApiKeyId: accessApiKey.Id,
ActionSeqNo: []int64{sqn.UndefinedSeqNo},
>>>>>>> a743bad... Indexing permissions as part of the Elastic Agent policy (#187)
}

err = createFleetAgent(ctx, bulker, agentId, agentData)
Expand Down Expand Up @@ -309,9 +307,9 @@ func generateAccessApiKey(ctx context.Context, client *elasticsearch.Client, age
return apikey.Create(ctx, client, agentId, "", []byte(kFleetAccessRolesJSON))
}

func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId string, outputName string) (*apikey.ApiKey, error) {
func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId, outputName string, roles []byte) (*apikey.ApiKey, error) {
name := fmt.Sprintf("%s:%s", agentId, outputName)
return apikey.Create(ctx, client, name, "", []byte(kFleetOutputRolesJSON))
return apikey.Create(ctx, client, name, "", roles)
}

func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*model.EnrollmentApiKey, error) {
Expand Down
20 changes: 0 additions & 20 deletions cmd/fleet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,6 @@ const kFleetAccessRolesJSON = `
}
`

const kFleetOutputRolesJSON = `
{
"fleet-output": {
"cluster": ["monitor"],
"index": [{
"names": [
"logs-*",
"metrics-*",
"traces-*",
".logs-endpoint.diagnostic.collection-*"
],
"privileges": [
"auto_configure",
"create_doc"
]
}]
}
}
`

// Wrong: no AAD;
// This defeats the signature check;
// can copy from one to another and will dispatch.
Expand Down
2 changes: 1 addition & 1 deletion fleet-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fleet:
agent:
id: 1e4954ce-af37-4731-9f4a-407b08e69e42
logging:
level: '${LOG_LEVEL:INFO}'
level: '${LOG_LEVEL:DEBUG}'

# Input config provided by the Elastic Agent for the server
#inputs:
Expand Down
25 changes: 18 additions & 7 deletions internal/pkg/dl/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"context"
"encoding/json"
"errors"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"sync"

"github.com/elastic/fleet-server/v7/internal/pkg/dsl"
)

var (
tmplQueryLatestPolicies []byte
initQueryLatestPoliciesOnce sync.Once
tmplQueryLatestPolicies = prepareQueryLatestPolicies()

queryPolicyByID = preparePolicyFindByID()
)

var ErrPolicyLeaderNotFound = errors.New("policy has no leader")
Expand All @@ -36,12 +37,22 @@ func prepareQueryLatestPolicies() []byte {
return root.MustMarshalJSON()
}

func preparePolicyFindByID() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()

root.Size(1)
root.Query().Bool().Filter().Term(FieldPolicyId, tmpl.Bind(FieldPolicyId), nil)
sort := root.Sort()
sort.SortOrder(FieldRevisionIdx, dsl.SortDescend)
sort.SortOrder(FieldCoordinatorIdx, dsl.SortDescend)

tmpl.MustResolve(root)
return tmpl
}

// QueryLatestPolices gets the latest revision for a policy
func QueryLatestPolicies(ctx context.Context, bulker bulk.Bulk, opt ...Option) ([]model.Policy, error) {
initQueryLatestPoliciesOnce.Do(func() {
tmplQueryLatestPolicies = prepareQueryLatestPolicies()
})

o := newOption(FleetPolicies, opt...)
res, err := bulker.Search(ctx, []string{o.indexName}, tmplQueryLatestPolicies)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/es/mapping.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d5ca9d9

Please sign in to comment.