diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index bd6132e40..2be8bd786 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/julienschmidt/httprouter" @@ -48,7 +49,7 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro // Don't log connection drops if err != context.Canceled { - log.Error().Err(err).Str("id", id).Int("code", code).Msg("Fail checkin") + log.Error().Err(err).Str("id", id).Int("code", code).Msg("fail checkin") } http.Error(w, err.Error(), code) } @@ -195,7 +196,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st return err } - log.Trace().RawJSON("resp", data).Msg("Checkin response") + log.Trace().RawJSON("resp", data).Msg("checkin response") return nil } @@ -211,7 +212,7 @@ func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent sn, err = ct.tr.Resolve(ctx, ackToken) if err != nil { if errors.Is(err, dl.ErrNotFound) { - log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("Revision token not found") + log.Debug().Str("token", ackToken).Str("agent_id", agent.Id).Msg("revision token not found") err = nil } else { return @@ -264,7 +265,8 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model. // 4) Inject default api key into structure // 5) Re-serialize and return AgentResp structure - var actionObj map[string]interface{} + // using json.RawMessage to avoid the full json de-serialization + var actionObj map[string]json.RawMessage if err := json.Unmarshal(p.Data, &actionObj); err != nil { return nil, err } @@ -276,26 +278,69 @@ func parsePolicy(ctx context.Context, bulker bulk.Bulk, agentId string, p model. return nil, err } + // Check if need to generate a new output api key + var ( + hash string + needKey bool + roles []byte + ) + if agent.DefaultApiKey == "" { - defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, "default") + hash, roles, err = policy.GetRoleDescriptors(actionObj[policy.OutputPermissionsProperty]) + if err != nil { + return nil, err + } + needKey = true + log.Debug().Str("agentId", agentId).Msg("agent API key is not present") + } else { + hash, roles, needKey, err = policy.CheckOutputPermissionsChanged(agent.PolicyOutputPermissionsHash, actionObj[policy.OutputPermissionsProperty]) + if err != nil { + return nil, err + } + if needKey { + log.Debug().Str("agentId", agentId).Msg("policy output permissions changed") + } else { + log.Debug().Str("agentId", agentId).Msg("policy output permissions are the same") + } + } + + if needKey { + log.Debug().Str("agentId", agentId).RawJSON("roles", roles).Str("hash", hash).Msg("generating a new API key") + defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agent.Id, policy.DefaultOutputName, roles) if err != nil { return nil, err } agent.DefaultApiKey = defaultOutputApiKey.Agent() agent.DefaultApiKeyId = defaultOutputApiKey.Id + agent.PolicyOutputPermissionsHash = hash - log.Info().Str("agentId", agentId).Msg("Rewriting full agent record to pick up default output key.") + log.Info().Str("agentId", agentId).Msg("rewriting full agent record to pick up default output key.") if err = dl.IndexAgent(ctx, bulker, agent); err != nil { return nil, err } } - if ok := setMapObj(actionObj, agent.DefaultApiKey, "outputs", "default", "api_key"); !ok { - log.Debug().Msg("Cannot inject api_key into policy") + // Parse the outputs maps in order to inject the api key + const outputsProperty = "outputs" + outputs, err := smap.Parse(actionObj[outputsProperty]) + if err != nil { + return nil, err + } + + if outputs != nil { + if ok := setMapObj(outputs, agent.DefaultApiKey, "default", "api_key"); !ok { + log.Debug().Msg("cannot inject api_key into policy") + } else { + outputRaw, err := json.Marshal(outputs) + if err != nil { + return nil, err + } + actionObj[outputsProperty] = json.RawMessage(outputRaw) + } } dataJSON, err := json.Marshal(struct { - Policy map[string]interface{} `json:"policy"` + Policy map[string]json.RawMessage `json:"policy"` }{actionObj}) if err != nil { return nil, err @@ -349,7 +394,7 @@ func findAgentByApiKeyId(ctx context.Context, bulker bulk.Bulk, id string) (*mod func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err error) { // Quick comparison first if bytes.Equal(req.LocalMeta, agent.LocalMetadata) { - log.Trace().Msg("Quick comparing local metadata is equal") + log.Trace().Msg("quick comparing local metadata is equal") return nil, nil } @@ -366,7 +411,7 @@ func parseMeta(agent *model.Agent, req *CheckinRequest) (fields Fields, err erro } if reqLocalMeta != nil && !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { - log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("Applying new local metadata") + log.Info().RawJSON("req.LocalMeta", req.LocalMeta).Msg("applying new local metadata") fields = map[string]interface{}{ FieldLocalMetadata: req.LocalMeta, } diff --git a/cmd/fleet/handleEnroll.go b/cmd/fleet/handleEnroll.go index c733e409a..280ce91b3 100644 --- a/cmd/fleet/handleEnroll.go +++ b/cmd/fleet/handleEnroll.go @@ -180,18 +180,6 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq return nil, err } - defaultOutputApiKey, err := generateOutputApiKey(ctx, bulker.Client(), agentId, "default") - if err != nil { - return nil, err - } - - log.Debug(). - Dur("rtt", time.Since(now)). - Str("agentId", agentId). - Str("accessApiKey.Id", accessApiKey.Id). - Str("defaultOutputApiKey.Id", defaultOutputApiKey.Id). - Msg("Created api key") - // Update the local metadata agent id localMeta, err := updateLocalMetaAgentId(req.Meta.Local, agentId) if err != nil { @@ -199,15 +187,13 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq } agentData := model.Agent{ - Active: true, - PolicyId: erec.PolicyId, - Type: req.Type, - EnrolledAt: now.UTC().Format(time.RFC3339), - LocalMetadata: localMeta, - AccessApiKeyId: accessApiKey.Id, - DefaultApiKeyId: defaultOutputApiKey.Id, - DefaultApiKey: defaultOutputApiKey.Agent(), - ActionSeqNo: []int64{sqn.UndefinedSeqNo}, + Active: true, + PolicyId: erec.PolicyId, + Type: req.Type, + EnrolledAt: now.UTC().Format(time.RFC3339), + LocalMetadata: localMeta, + AccessApiKeyId: accessApiKey.Id, + ActionSeqNo: []int64{sqn.UndefinedSeqNo}, } err = createFleetAgent(ctx, bulker, agentId, agentData) @@ -310,9 +296,9 @@ func generateAccessApiKey(ctx context.Context, client *elasticsearch.Client, age return apikey.Create(ctx, client, agentId, "", []byte(kFleetAccessRolesJSON)) } -func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId string, outputName string) (*apikey.ApiKey, error) { +func generateOutputApiKey(ctx context.Context, client *elasticsearch.Client, agentId, outputName string, roles []byte) (*apikey.ApiKey, error) { name := fmt.Sprintf("%s:%s", agentId, outputName) - return apikey.Create(ctx, client, name, "", []byte(kFleetOutputRolesJSON)) + return apikey.Create(ctx, client, name, "", roles) } func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*model.EnrollmentApiKey, error) { diff --git a/cmd/fleet/schema.go b/cmd/fleet/schema.go index 926aabaf0..77265c5e1 100644 --- a/cmd/fleet/schema.go +++ b/cmd/fleet/schema.go @@ -35,26 +35,6 @@ const kFleetAccessRolesJSON = ` } ` -const kFleetOutputRolesJSON = ` - { - "fleet-output": { - "cluster": ["monitor"], - "index": [{ - "names": [ - "logs-*", - "metrics-*", - "traces-*", - ".logs-endpoint.diagnostic.collection-*" - ], - "privileges": [ - "auto_configure", - "create_doc" - ] - }] - } - } -` - // Wrong: no AAD; // This defeats the signature check; // can copy from one to another and will dispatch. diff --git a/internal/pkg/dl/policies.go b/internal/pkg/dl/policies.go index f5277e731..a0591507a 100644 --- a/internal/pkg/dl/policies.go +++ b/internal/pkg/dl/policies.go @@ -8,16 +8,17 @@ import ( "context" "encoding/json" "errors" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" - "sync" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" ) var ( - tmplQueryLatestPolicies []byte - initQueryLatestPoliciesOnce sync.Once + tmplQueryLatestPolicies = prepareQueryLatestPolicies() + + queryPolicyByID = preparePolicyFindByID() ) var ErrPolicyLeaderNotFound = errors.New("policy has no leader") @@ -36,12 +37,22 @@ func prepareQueryLatestPolicies() []byte { return root.MustMarshalJSON() } +func preparePolicyFindByID() *dsl.Tmpl { + tmpl := dsl.NewTmpl() + root := dsl.NewRoot() + + root.Size(1) + root.Query().Bool().Filter().Term(FieldPolicyId, tmpl.Bind(FieldPolicyId), nil) + sort := root.Sort() + sort.SortOrder(FieldRevisionIdx, dsl.SortDescend) + sort.SortOrder(FieldCoordinatorIdx, dsl.SortDescend) + + tmpl.MustResolve(root) + return tmpl +} + // QueryLatestPolices gets the latest revision for a policy func QueryLatestPolicies(ctx context.Context, bulker bulk.Bulk, opt ...Option) ([]model.Policy, error) { - initQueryLatestPoliciesOnce.Do(func() { - tmplQueryLatestPolicies = prepareQueryLatestPolicies() - }) - o := newOption(FleetPolicies, opt...) res, err := bulker.Search(ctx, []string{o.indexName}, tmplQueryLatestPolicies) if err != nil { diff --git a/internal/pkg/es/mapping.go b/internal/pkg/es/mapping.go index 0aa36732e..0a45abfba 100644 --- a/internal/pkg/es/mapping.go +++ b/internal/pkg/es/mapping.go @@ -128,6 +128,9 @@ const ( "policy_id": { "type": "keyword" }, + "policy_output_permissions_hash": { + "type": "keyword" + }, "policy_revision_idx": { "type": "integer" }, diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 485892a31..389c2fa30 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -132,6 +132,9 @@ type Agent struct { // The policy ID for the Elastic Agent PolicyId string `json:"policy_id,omitempty"` + // The policy output permissions hash + PolicyOutputPermissionsHash string `json:"policy_output_permissions_hash,omitempty"` + // The current policy revision_idx for the Elastic Agent PolicyRevisionIdx int64 `json:"policy_revision_idx,omitempty"` diff --git a/internal/pkg/policy/output_permissions.go b/internal/pkg/policy/output_permissions.go new file mode 100644 index 000000000..465de52f9 --- /dev/null +++ b/internal/pkg/policy/output_permissions.go @@ -0,0 +1,99 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package policy + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + + "github.com/elastic/fleet-server/v7/internal/pkg/smap" +) + +const ( + DefaultOutputName = "default" + OutputPermissionsProperty = "output_permissions" +) + +var ( + ErrOutputPermissionsNotFound = errors.New("output_permissions not found") + ErrDefaultOutputNotFound = errors.New("default output not found") + ErrInvalidPermissionsFormat = errors.New("invalid permissions format") +) + +func GetRoleDescriptors(outputPermissionsRaw []byte) (hash string, roles []byte, err error) { + if len(outputPermissionsRaw) == 0 { + return + } + + output, err := getDefaultOutputMap(outputPermissionsRaw) + if err != nil { + return + } + + // Calculating the hash of the original output map + hash, err = output.Hash() + if err != nil { + return + } + + roles, err = json.Marshal(output) + if err != nil { + return + } + + return +} + +func CheckOutputPermissionsChanged(hash string, outputPermissionsRaw []byte) (newHash string, roles []byte, changed bool, err error) { + if len(outputPermissionsRaw) == 0 { + return + } + + // shotcuircut, hash and compare as is, if equals the json is serialized consistently from jsacascript and go + newHash, err = getDefaultOutputHash(outputPermissionsRaw) + if err != nil { + return + } + if hash == newHash { + return hash, nil, false, nil + } + + newHash, roles, err = GetRoleDescriptors(outputPermissionsRaw) + if err != nil { + return + } + + return newHash, roles, (newHash != hash), nil +} + +func getDefaultOutputHash(outputPermissionsRaw []byte) (hash string, err error) { + var m map[string]json.RawMessage + err = json.Unmarshal(outputPermissionsRaw, &m) + if err != nil { + return + } + + if len(m[DefaultOutputName]) == 0 { + return + } + + b := sha256.Sum256(m[DefaultOutputName]) + return hex.EncodeToString(b[:]), nil +} + +func getDefaultOutputMap(outputPermissionsRaw []byte) (defaultOutput smap.Map, err error) { + outputPermissions, err := smap.Parse(outputPermissionsRaw) + if err != nil { + return + } + + defaultOutput = outputPermissions.GetMap(DefaultOutputName) + if defaultOutput == nil { + err = ErrDefaultOutputNotFound + } + return +} diff --git a/internal/pkg/policy/output_permissions_test.go b/internal/pkg/policy/output_permissions_test.go new file mode 100644 index 000000000..472d9ee74 --- /dev/null +++ b/internal/pkg/policy/output_permissions_test.go @@ -0,0 +1,218 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package policy + +import ( + "testing" + + "github.com/elastic/fleet-server/v7/internal/pkg/smap" + "github.com/google/go-cmp/cmp" +) + +const ( + fallbackPermissions = ` + { + "default": { + "_fallback": { + "indices": [ + { + "names": [ + "logs-*", + "metrics-*", + "traces-*", + ".logs-endpoint.diagnostic.collection-*" + ], + "privileges": [ + "auto_configure", + "create_doc" + ] + } + ] + } + } + } +` + fallbackPermissionsHash = "48e2e1dfe0e64df0dd841e96e28bb82ff6273432e9ebccca259a3278ff86ee4c" + + outputPermissions = ` + { + "default": { + "nginx-logs-1": { + "indices": [ + { + "names": [ + "logs-nginx.access-*", + "logs-nginx.error-*" + ], + "privileges": [ + "append" + ] + } + ] + }, + "nginx-metrics-1": { + "indices": [ + { + "names": [ + "metrics-nginx.substatus-*" + ], + "privileges": [ + "append" + ] + } + ] + }, + "endpoint-policy1-part1": { + "indices": [ + { + "names": [ + ".logs-endpoint.diagnostic.collection-*" + ], + "privileges": [ + "read" + ] + } + ] + }, + "endpoint-policy1-part2": { + "indices": [ + { + "names": [ + "metrics-endpoint-*" + ], + "privileges": [ + "append" + ] + } + ] + } + } + } +` + outputPermissionsHash = "42c955b5df44eec374dc66a97ab8c2045a88583af499aba81345c4221e473ead" + + resultDescriptors = ` +{ + "endpoint-policy1-part1": { + "indices": [ + { + "names": [ + ".logs-endpoint.diagnostic.collection-*" + ], + "privileges": [ + "read" + ] + } + ] + }, + "endpoint-policy1-part2": { + "indices": [ + { + "names": [ + "metrics-endpoint-*" + ], + "privileges": [ + "append" + ] + } + ] + }, + "nginx-logs-1": { + "indices": [ + { + "names": [ + "logs-nginx.access-*", + "logs-nginx.error-*" + ], + "privileges": [ + "append" + ] + } + ] + }, + "nginx-metrics-1": { + "indices": [ + { + "names": [ + "metrics-nginx.substatus-*" + ], + "privileges": [ + "append" + ] + } + ] + } +} +` +) + +func TestGetRoleDescriptors(t *testing.T) { + + hash, roles, err := GetRoleDescriptors([]byte(outputPermissions)) + if err != nil { + t.Fatal(err) + } + + m, err := smap.Parse([]byte(resultDescriptors)) + if err != nil { + t.Fatal(err) + } + expected, err := m.Marshal() + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(expected, roles) + if diff != "" { + t.Fatal(diff) + } + + diff = cmp.Diff(outputPermissionsHash, hash) + if diff != "" { + t.Fatal(diff) + } +} + +func TestCheckOutputPermissionsChanged(t *testing.T) { + // Detect change with initially empty hash + hash, roles, changed, err := CheckOutputPermissionsChanged("", []byte(fallbackPermissions)) + if err != nil { + t.Fatal(err) + } + diff := cmp.Diff(fallbackPermissionsHash, hash) + if diff != "" { + t.Error(diff) + } + + if !changed { + t.Error("expected policy hash change detected") + } + + if len(roles) == 0 { + t.Error("expected non empty roles descriptors") + } + + // Detect no change with the same hash and the content + newHash, roles, changed, err := CheckOutputPermissionsChanged(hash, []byte(fallbackPermissions)) + diff = cmp.Diff(hash, newHash) + if diff != "" { + t.Error(diff) + } + if changed { + t.Error("expected policy hash no change detected") + } + + // Detect the change with the new output permissions + newHash, roles, changed, err = CheckOutputPermissionsChanged(hash, []byte(outputPermissions)) + diff = cmp.Diff(outputPermissionsHash, newHash) + if diff != "" { + t.Error(diff) + } + if !changed { + t.Error("expected policy hash change detected") + } +} diff --git a/internal/pkg/smap/smap.go b/internal/pkg/smap/smap.go new file mode 100644 index 000000000..3c636dffd --- /dev/null +++ b/internal/pkg/smap/smap.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package smap + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +type Map map[string]interface{} + +func (m Map) GetMap(k string) Map { + if m == nil { + return m + } + + v := m[k] + if v != nil { + if m, ok := v.(map[string]interface{}); ok { + return m + } + } + return nil +} + +func (m Map) GetString(k string) string { + if m == nil { + return "" + } + if v := m[k]; v != nil { + if s, ok := v.(string); ok { + return s + } + } + return "" +} + +func (m Map) Hash() (string, error) { + if m == nil { + return "", nil + } + + // Hashing through the json encoder + h := sha256.New() + enc := json.NewEncoder(h) + err := enc.Encode(m) + if err != nil { + return "", err + } + + return hex.EncodeToString(h.Sum(nil)), nil +} + +func (m Map) Marshal() ([]byte, error) { + if m == nil { + return nil, nil + } + return json.Marshal(m) +} + +func Parse(data []byte) (Map, error) { + if len(data) == 0 { + return nil, nil + } + + var m Map + + err := json.Unmarshal(data, &m) + + return m, err +} diff --git a/model/schema.json b/model/schema.json index 8386463a5..9dcfe3d38 100644 --- a/model/schema.json +++ b/model/schema.json @@ -394,6 +394,10 @@ "description": "The current policy coordinator for the Elastic Agent", "type": "integer" }, + "policy_output_permissions_hash": { + "description": "The policy output permissions hash", + "type": "string" + }, "last_updated": { "description": "Date/time the Elastic Agent was last updated", "type": "string",