From 26938a466ec7a185fc7d7cd6ce302115a84116bc Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 19 Sep 2022 17:16:49 +0200 Subject: [PATCH 1/5] Revert "Revert "Fix v8.5.0 migration painless script" (#1878)" This reverts commit ef9ca2bd78641e48e8ac2356989dfbfeebc4efa6. --- cmd/fleet/main.go | 14 +- internal/pkg/api/handleAck.go | 49 ++- internal/pkg/api/handleAck_test.go | 39 ++- internal/pkg/api/handleCheckin.go | 15 +- internal/pkg/api/handleEnroll.go | 11 +- internal/pkg/apikey/apikey.go | 61 ++++ .../pkg/apikey/apikey_integration_test.go | 103 ++++-- internal/pkg/apikey/create.go | 1 - internal/pkg/apikey/get.go | 68 ---- internal/pkg/apikey/invalidate.go | 1 - internal/pkg/apikey/metadata.go | 20 +- internal/pkg/bulk/opBulk.go | 16 +- internal/pkg/coordinator/monitor.go | 17 +- .../coordinator/monitor_integration_test.go | 23 +- internal/pkg/dl/agent.go | 22 +- internal/pkg/dl/agent_integration_test.go | 45 +++ internal/pkg/dl/constants.go | 32 +- internal/pkg/dl/migration.go | 290 ++++++++++++----- internal/pkg/dl/migration_integration_test.go | 295 ++++++++++++++++++ internal/pkg/es/error.go | 18 +- internal/pkg/model/ext.go | 28 +- internal/pkg/model/ext_test.go | 55 +++- internal/pkg/model/schema.go | 53 +++- internal/pkg/policy/parsed_policy.go | 10 +- internal/pkg/policy/parsed_policy_test.go | 1 - internal/pkg/policy/policy_output.go | 244 +++++++++------ .../policy/policy_output_integration_test.go | 127 ++++++++ internal/pkg/policy/policy_output_test.go | 137 ++++++-- internal/pkg/testing/esutil/bootstrap.go | 2 +- internal/pkg/testing/setup.go | 16 +- model/schema.json | 89 ++++-- 31 files changed, 1445 insertions(+), 457 deletions(-) delete mode 100644 internal/pkg/apikey/get.go create mode 100644 internal/pkg/dl/migration_integration_test.go create mode 100644 internal/pkg/policy/policy_output_integration_test.go diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index a28ec50c3..976564cb8 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g * remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version) if err != nil { if len(remoteVersion) != 0 { - return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err) + return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", + f.bi.Version, remoteVersion, err) } return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err) } - // Run migrations; current safe to do in background. That may change in the future. - g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { + // Run migrations + loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error { return dl.Migrate(ctx, bulker) - })) + }) + if err = loggedMigration(); err != nil { + return fmt.Errorf("failed to run subsystems: %w", err) + } - // Run schduler for periodic GC/cleanup + // Run scheduler for periodic GC/cleanup gcCfg := cfg.Inputs[0].Server.GC sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval)) if err != nil { diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index b3febcfd8..1261d8f6c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -15,6 +15,8 @@ import ( "strings" "time" + "github.com/pkg/errors" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -24,7 +26,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" - "github.com/pkg/errors" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog" @@ -337,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag Int64("rev.coordinatorIdx", rev.CoordinatorIdx). Msg("ack policy revision") - if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev || - (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { + if ok && rev.PolicyID == agent.PolicyID && + (rev.RevisionIdx > currRev || + (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { found = true currRev = rev.RevisionIdx currCoord = rev.CoordinatorIdx @@ -349,17 +351,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - sz := len(agent.DefaultAPIKeyHistory) - if sz > 0 { - ids := make([]string, sz) - for i := 0; i < sz; i++ { - ids[i] = agent.DefaultAPIKeyHistory[i].ID - } - log.Info().Strs("ids", ids).Msg("Invalidate old API keys") - if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { - log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") - } - } + ack.invalidateAPIKeys(ctx, agent) body := makeUpdatePolicyBody( agent.PolicyID, @@ -385,8 +377,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return errors.Wrap(err, "handlePolicyChange update") } +func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { + var ids []string + for _, out := range agent.Outputs { + for _, k := range out.ToRetireAPIKeyIds { + ids = append(ids, k.ID) + } + } + + if len(ids) > 0 { + log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") + if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { + log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") + } + } +} + func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error { - apiKeys := _getAPIKeyIDs(agent) + apiKeys := agent.APIKeyIDs() if len(apiKeys) > 0 { zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger() @@ -441,17 +449,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return nil } -func _getAPIKeyIDs(agent *model.Agent) []string { - keys := make([]string, 0, 1) - if agent.AccessAPIKeyID != "" { - keys = append(keys, agent.AccessAPIKeyID) - } - if agent.DefaultAPIKeyID != "" { - keys = append(keys, agent.DefaultAPIKeyID) - } - return keys -} - // Generate an update script that validates that the policy_id // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 90c961456..60a265bd4 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -15,13 +15,14 @@ import ( "net/http" "testing" + "github.com/google/go-cmp/cmp" + "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -439,3 +440,39 @@ func TestHandleAckEvents(t *testing.T) { }) } } + +func TestInvalidateAPIKeys(t *testing.T) { + toRetire1 := []model.ToRetireAPIKeyIdsItems{{ + ID: "toRetire1", + }} + toRetire2 := []model.ToRetireAPIKeyIdsItems{{ + ID: "toRetire2_0", + }, { + ID: "toRetire2_1", + }} + var toRetire3 []model.ToRetireAPIKeyIdsItems + + want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"} + + agent := model.Agent{ + Outputs: map[string]*model.PolicyOutput{ + "1": {ToRetireAPIKeyIds: toRetire1}, + "2": {ToRetireAPIKeyIds: toRetire2}, + "3": {ToRetireAPIKeyIds: toRetire3}, + }, + } + + bulker := ftesting.NewMockBulk() + bulker.On("APIKeyInvalidate", + context.Background(), mock.MatchedBy(func(ids []string) bool { + // if A contains B and B contains A => A = B + return assert.Subset(t, ids, want) && + assert.Subset(t, want, ids) + })). + Return(nil) + + ack := &AckT{bulk: bulker} + ack.invalidateAPIKeys(context.Background(), &agent) + + bulker.AssertExpectations(t) +} diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 721ee4538..2752dd147 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -10,6 +10,7 @@ import ( "compress/gzip" "context" "encoding/json" + "fmt" "math/rand" "net/http" "reflect" @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro Logger() err := rt.ct.handleCheckin(&zlog, w, r, id) - if err != nil { cntCheckin.IncError(err) resp := NewHTTPErrResp(err) @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin // func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) { zlog = zlog.With(). - Str("ctx", "processPolicy"). - Int64("policyRevision", pp.Policy.RevisionIdx). - Int64("policyCoordinator", pp.Policy.CoordinatorIdx). + Str("fleet.ctx", "processPolicy"). + Int64("fleet.policyRevision", pp.Policy.RevisionIdx). + Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() - // Repull and decode the agent object. Do not trust the cache. + // Repull and decode the agent object. Do not trust the cache. agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID) if err != nil { zlog.Error().Err(err).Msg("fail find agent record") @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Parse the outputs maps in order to prepare the outputs const outputsProperty = "outputs" outputs, err := smap.Parse(pp.Fields[outputsProperty]) - if err != nil { return nil, err } @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs) - if err != nil { - return nil, err + return nil, fmt.Errorf("failed to prepare output %q: %w", + policyOutput.Name, err) } } diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index a3c2f9833..7c5b1dd5a 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -53,7 +53,6 @@ type EnrollerT struct { } func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) { - log.Info(). Interface("limits", cfg.Limits.EnrollLimit). Msg("Setting config enroll_limit") @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger, return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver) } -func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) { +func (et *EnrollerT) _enroll( + ctx context.Context, + rb *rollback.Rollback, + zlog zerolog.Logger, + req *EnrollRequest, + policyID, + ver string) (*EnrollResponse, error) { if req.SharedID != "" { // TODO: Support pre-existing install @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) ( agentID, "", []byte(kFleetAccessRolesJSON), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) } diff --git a/internal/pkg/apikey/apikey.go b/internal/pkg/apikey/apikey.go index 4924a647b..4134f2b0d 100644 --- a/internal/pkg/apikey/apikey.go +++ b/internal/pkg/apikey/apikey.go @@ -6,12 +6,18 @@ package apikey import ( + "bytes" + "context" "encoding/base64" + "encoding/json" "errors" "fmt" "net/http" "strings" "unicode/utf8" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" ) const ( @@ -28,6 +34,61 @@ var ( var AuthKey = http.CanonicalHeaderKey("Authorization") +// APIKeyMetadata tracks Metadata associated with an APIKey. +type APIKeyMetadata struct { + ID string + Metadata Metadata +} + +// Read gathers APIKeyMetadata from Elasticsearch using the given client. +func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { + opts := []func(*esapi.SecurityGetAPIKeyRequest){ + client.Security.GetAPIKey.WithContext(ctx), + client.Security.GetAPIKey.WithID(id), + } + + res, err := client.Security.GetAPIKey( + opts..., + ) + if err != nil { + return nil, fmt.Errorf("request to elasticsearch failed: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound) + } + + type APIKeyResponse struct { + ID string `json:"id"` + Metadata Metadata `json:"metadata"` + } + type GetAPIKeyResponse struct { + APIKeys []APIKeyResponse `json:"api_keys"` + } + + var buff bytes.Buffer + if _, err := buff.ReadFrom(res.Body); err != nil { + return nil, fmt.Errorf("could not read from response body: %w", err) + } + + var resp GetAPIKeyResponse + if err = json.Unmarshal(buff.Bytes(), &resp); err != nil { + return nil, fmt.Errorf( + "could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err) + } + + if len(resp.APIKeys) == 0 { + return nil, ErrAPIKeyNotFound + } + + first := resp.APIKeys[0] + return &APIKeyMetadata{ + ID: first.ID, + Metadata: first.Metadata, + }, nil +} + // APIKey is used to represent an Elasticsearch API Key. type APIKey struct { ID string diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 5c4e3b97c..72f410d99 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -30,7 +30,7 @@ const testFleetRoles = ` } ` -func TestCreateAPIKeyWithMetadata(t *testing.T) { +func TestRead(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -44,44 +44,83 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { t.Fatal(err) } - // Create the key - agentID := uuid.Must(uuid.NewV4()).String() - name := uuid.Must(uuid.NewV4()).String() - akey, err := Create(ctx, es, name, "", "true", []byte(testFleetRoles), - NewMetadata(agentID, TypeAccess)) - if err != nil { - t.Fatal(err) + // Try to get the key that doesn't exist, expect ErrApiKeyNotFound + _, err = Read(ctx, es, "0000000000000") + if !errors.Is(err, ErrAPIKeyNotFound) { + t.Errorf("Unexpected error type: %v", err) } - // Get the key and verify that metadata was saved correctly - aKeyMeta, err := Read(ctx, es, akey.ID) - if err != nil { - t.Fatal(err) +} +func TestCreateAPIKeyWithMetadata(t *testing.T) { + tts := []struct { + name string + outputName string + }{ + {name: "with metadata.output_name", outputName: "a_output_name"}, + {name: "without metadata.output_name"}, } - diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) - if diff != "" { - t.Error(diff) - } + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() - diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) - if diff != "" { - t.Error(diff) - } + cfg := elasticsearch.Config{ + Username: "elastic", + Password: "changeme", + } - diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) - if diff != "" { - t.Error(diff) - } + es, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatal(err) + } - diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) - if diff != "" { - t.Error(diff) - } + // Create the API key + agentID := uuid.Must(uuid.NewV4()).String() + name := uuid.Must(uuid.NewV4()).String() + outputName := tt.outputName + apiKey, err := Create( + ctx, + es, + name, + "", + "true", + []byte(testFleetRoles), + NewMetadata(agentID, outputName, TypeAccess)) + if err != nil { + t.Fatal(err) + } - // Try to get the key that doesn't exists, expect ErrApiKeyNotFound - _, err = Read(ctx, es, "0000000000000") - if !errors.Is(err, ErrAPIKeyNotFound) { - t.Errorf("Unexpected error type: %v", err) + // Get the API key and verify that the metadata was saved correctly + aKeyMeta, err := Read(ctx, es, apiKey.ID) + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(outputName, aKeyMeta.Metadata.OutputName) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) + if diff != "" { + t.Error(diff) + } + }) } } diff --git a/internal/pkg/apikey/create.go b/internal/pkg/apikey/create.go index f3cee99f8..de61390c3 100644 --- a/internal/pkg/apikey/create.go +++ b/internal/pkg/apikey/create.go @@ -42,7 +42,6 @@ func Create(ctx context.Context, client *elasticsearch.Client, name, ttl, refres bytes.NewReader(body), opts..., ) - if err != nil { return nil, err } diff --git a/internal/pkg/apikey/get.go b/internal/pkg/apikey/get.go deleted file mode 100644 index 5d931c670..000000000 --- a/internal/pkg/apikey/get.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package apikey - -import ( - "context" - "encoding/json" - - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" - "github.com/pkg/errors" -) - -// APIKetMetadata tracks Metadata associated with an APIKey. -type APIKeyMetadata struct { - ID string - Metadata Metadata -} - -// Read gathers APIKeyMetadata from Elasticsearch using the given client. -func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { - - opts := []func(*esapi.SecurityGetAPIKeyRequest){ - client.Security.GetAPIKey.WithContext(ctx), - client.Security.GetAPIKey.WithID(id), - } - - res, err := client.Security.GetAPIKey( - opts..., - ) - - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.IsError() { - err = errors.Wrap(ErrAPIKeyNotFound, res.String()) - return nil, err - } - - type APIKeyResponse struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata"` - } - type GetAPIKeyResponse struct { - APIKeys []APIKeyResponse `json:"api_keys"` - } - - var resp GetAPIKeyResponse - d := json.NewDecoder(res.Body) - if err = d.Decode(&resp); err != nil { - return nil, err - } - - if len(resp.APIKeys) == 0 { - return nil, ErrAPIKeyNotFound - } - - first := resp.APIKeys[0] - - return &APIKeyMetadata{ - ID: first.ID, - Metadata: first.Metadata, - }, nil -} diff --git a/internal/pkg/apikey/invalidate.go b/internal/pkg/apikey/invalidate.go index 421662388..6c5d5d304 100644 --- a/internal/pkg/apikey/invalidate.go +++ b/internal/pkg/apikey/invalidate.go @@ -38,7 +38,6 @@ func Invalidate(ctx context.Context, client *elasticsearch.Client, ids ...string bytes.NewReader(body), opts..., ) - if err != nil { return fmt.Errorf("InvalidateAPIKey: %w", err) } diff --git a/internal/pkg/apikey/metadata.go b/internal/pkg/apikey/metadata.go index c80997c7b..d00380c01 100644 --- a/internal/pkg/apikey/metadata.go +++ b/internal/pkg/apikey/metadata.go @@ -19,18 +19,20 @@ func (t Type) String() string { // Metadata is additional information associated with an APIKey. type Metadata struct { - AgentID string `json:"agent_id,omitempty"` - Managed bool `json:"managed,omitempty"` - ManagedBy string `json:"managed_by,omitempty"` - Type string `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Managed bool `json:"managed,omitempty"` + ManagedBy string `json:"managed_by,omitempty"` + OutputName string `json:"output_name,omitempty"` + Type string `json:"type,omitempty"` } // NewMetadata returns Metadata for the given agentID. -func NewMetadata(agentID string, typ Type) Metadata { +func NewMetadata(agentID string, outputName string, typ Type) Metadata { return Metadata{ - AgentID: agentID, - Managed: true, - ManagedBy: ManagedByFleetServer, - Type: typ.String(), + AgentID: agentID, + Managed: true, + ManagedBy: ManagedByFleetServer, + OutputName: outputName, + Type: typ.String(), } } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 50b2c47e0..d47ba9592 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -7,6 +7,7 @@ package bulk import ( "bytes" "context" + "errors" "fmt" "time" @@ -187,7 +188,6 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { } res, err := req.Do(ctx, b.es) - if err != nil { log.Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err @@ -217,12 +217,18 @@ func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { var blk bulkIndexerResponse blk.Items = make([]bulkStubItem, 0, queueCnt) + // TODO: We're loosing information abut the errors, we should check a way + // to return the full error ES returns if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { - log.Error(). - Err(err). + log.Err(err). Str("mod", kModBulk). - Msg("Unmarshal error") - return err + Msg("flushBulk failed, could not unmarshal ES response") + return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err) + } + if blk.HasErrors { + // We lack information to properly correlate this error with what has failed. + // Thus, for now it'd be more noise than information outside an investigation. + log.Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } log.Trace(). diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 53870e58e..2242305f8 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -508,7 +508,7 @@ func runUnenroller(ctx context.Context, bulker bulk.Bulk, policyID string, unenr func runUnenrollerWork(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, zlog zerolog.Logger, agentsIndex string) error { agents, err := dl.FindOfflineAgents(ctx, bulker, policyID, unenrollTimeout, dl.WithIndexName(agentsIndex)) - if err != nil || len(agents) == 0 { + if err != nil { return err } @@ -540,11 +540,13 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a dl.FieldUnenrolledReason: unenrolledReasonTimeout, dl.FieldUpdatedAt: now, } + body, err := fields.Marshal() if err != nil { return err } - apiKeys := getAPIKeyIDs(agent) + + apiKeys := agent.APIKeyIDs() zlog = zlog.With(). Str(logger.AgentID, agent.Id). @@ -567,17 +569,6 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } -func getAPIKeyIDs(agent *model.Agent) []string { - keys := make([]string, 0, 1) - if agent.AccessAPIKeyID != "" { - keys = append(keys, agent.AccessAPIKeyID) - } - if agent.DefaultAPIKeyID != "" { - keys = append(keys, agent.DefaultAPIKeyID) - } - return keys -} - func waitWithContext(ctx context.Context, to time.Duration) error { t := time.NewTimer(to) defer t.Stop() diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index ffef699d1..defc4a9c7 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -159,7 +159,7 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -167,20 +167,21 @@ func TestMonitorUnenroller(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "default", apikey.TypeAccess), ) require.NoError(t, err) // add agent that should be unenrolled sixAgo := time.Now().UTC().Add(-6 * time.Minute) agentBody, err := json.Marshal(model.Agent{ - AccessAPIKeyID: accessKey.ID, - DefaultAPIKeyID: outputKey.ID, - Active: true, - EnrolledAt: sixAgo.Format(time.RFC3339), - LastCheckin: sixAgo.Format(time.RFC3339), - PolicyID: policy1Id, - UpdatedAt: sixAgo.Format(time.RFC3339), + AccessAPIKeyID: accessKey.ID, + Outputs: map[string]*model.PolicyOutput{ + "default": {APIKeyID: outputKey.ID}}, + Active: true, + EnrolledAt: sixAgo.Format(time.RFC3339), + LastCheckin: sixAgo.Format(time.RFC3339), + PolicyID: policy1Id, + UpdatedAt: sixAgo.Format(time.RFC3339), }) require.NoError(t, err) _, err = bulker.Create(ctx, agentsIndex, agentID, agentBody) @@ -306,7 +307,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "", apikey.TypeAccess), ) require.NoError(t, err) outputKey, err := bulker.APIKeyCreate( @@ -314,7 +315,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) { agentID, "", []byte(""), - apikey.NewMetadata(agentID, apikey.TypeAccess), + apikey.NewMetadata(agentID, "default", apikey.TypeAccess), ) require.NoError(t, err) diff --git a/internal/pkg/dl/agent.go b/internal/pkg/dl/agent.go index 1d52082f7..a4871fa73 100644 --- a/internal/pkg/dl/agent.go +++ b/internal/pkg/dl/agent.go @@ -6,6 +6,7 @@ package dl import ( "context" + "fmt" "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" @@ -48,19 +49,23 @@ func prepareOfflineAgentsByPolicyID() *dsl.Tmpl { return tmpl } -func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (agent model.Agent, err error) { +func FindAgent(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, name string, v interface{}, opt ...Option) (model.Agent, error) { o := newOption(FleetAgents, opt...) res, err := SearchWithOneParam(ctx, bulker, tmpl, o.indexName, name, v) if err != nil { - return + return model.Agent{}, fmt.Errorf("failed searching for agent: %w", err) } if len(res.Hits) == 0 { - return agent, ErrNotFound + return model.Agent{}, ErrNotFound } - err = res.Hits[0].Unmarshal(&agent) - return agent, err + var agent model.Agent + if err = res.Hits[0].Unmarshal(&agent); err != nil { + return model.Agent{}, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) + } + + return agent, nil } func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, unenrollTimeout time.Duration, opt ...Option) ([]model.Agent, error) { @@ -71,18 +76,19 @@ func FindOfflineAgents(ctx context.Context, bulker bulk.Bulk, policyID string, u FieldLastCheckin: past, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed searching for agent: %w", err) } if len(res.Hits) == 0 { - return nil, nil + return nil, ErrNotFound } agents := make([]model.Agent, len(res.Hits)) for i, hit := range res.Hits { if err := hit.Unmarshal(&agents[i]); err != nil { - return nil, err + return nil, fmt.Errorf("could not unmarshal ES document into model.Agent: %w", err) } } + return agents, nil } diff --git a/internal/pkg/dl/agent_integration_test.go b/internal/pkg/dl/agent_integration_test.go index 4e65ddb94..3baab6c7e 100644 --- a/internal/pkg/dl/agent_integration_test.go +++ b/internal/pkg/dl/agent_integration_test.go @@ -108,3 +108,48 @@ func TestFindOfflineAgents(t *testing.T) { require.Len(t, agents, 2) assert.EqualValues(t, []string{twoDayOldID, threeDayOldID}, []string{agents[0].Id, agents[1].Id}) } + +func TestFindAgent_NewModel(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + policyID := uuid.Must(uuid.NewV4()).String() + agentID := uuid.Must(uuid.NewV4()).String() + + wantOutputs := map[string]*model.PolicyOutput{ + "default": { + Type: "elasticsearch", + APIKey: "TestFindNewModelAgent_APIKey", + ToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{ + { + ID: "TestFindNewModelAgent_APIKeyID_invalidated", + RetiredAt: "TestFindNewModelAgent_APIKeyID_invalidated_at"}, + }, + APIKeyID: "TestFindNewModelAgent_APIKeyID", + PermissionsHash: "TestFindNewModelAgent_PermisPolicysionsHash", + }, + } + body, err := json.Marshal(model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + Outputs: wantOutputs, + }) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + agent, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) + require.NoError(t, err) + + assert.Equal(t, agentID, agent.Id) + assert.Equal(t, wantOutputs, agent.Outputs) +} diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 9fb0614da..034c23541 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -27,22 +27,22 @@ const ( FieldMaxSeqNo = "max_seq_no" FieldActionSeqNo = "action_seq_no" - FieldActionID = "action_id" - FieldPolicyID = "policy_id" - FieldRevisionIdx = "revision_idx" - FieldCoordinatorIdx = "coordinator_idx" - FieldLastCheckin = "last_checkin" - FieldLastCheckinStatus = "last_checkin_status" - FieldLocalMetadata = "local_metadata" - FieldPolicyRevisionIdx = "policy_revision_idx" - FieldPolicyCoordinatorIdx = "policy_coordinator_idx" - FieldDefaultAPIKey = "default_api_key" - FieldDefaultAPIKeyID = "default_api_key_id" //nolint:gosec // field name - FieldDefaultAPIKeyHistory = "default_api_key_history" //nolint:gosec // field name - FieldPolicyOutputPermissionsHash = "policy_output_permissions_hash" - FieldUnenrolledReason = "unenrolled_reason" - FieldAgentVersion = "version" - FieldAgent = "agent" + FieldActionID = "action_id" + FieldAgent = "agent" + FieldAgentVersion = "version" + FieldCoordinatorIdx = "coordinator_idx" + FieldLastCheckin = "last_checkin" + FieldLastCheckinStatus = "last_checkin_status" + FieldLocalMetadata = "local_metadata" + FieldPolicyCoordinatorIdx = "policy_coordinator_idx" + FieldPolicyID = "policy_id" + FieldPolicyOutputAPIKey = "api_key" + FieldPolicyOutputAPIKeyID = "api_key_id" + FieldPolicyOutputPermissionsHash = "permissions_hash" + FieldPolicyOutputToRetireAPIKeyIDs = "to_retire_api_key_ids" //nolint:gosec // false positive + FieldPolicyRevisionIdx = "policy_revision_idx" + FieldRevisionIdx = "revision_idx" + FieldUnenrolledReason = "unenrolled_reason" FieldActive = "active" FieldUpdatedAt = "updated_at" diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 4beb26741..4107dfd38 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -12,59 +12,73 @@ import ( "net/http" "time" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dsl" - "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/pkg/errors" "github.com/rs/zerolog/log" -) -func Migrate(ctx context.Context, bulker bulk.Bulk) error { - return migrateAgentMetadata(ctx, bulker) -} - -// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. -// This field was populated in new enrollments in 7.15 and later; however, the -// change was not backported to support 7.14. The security team is reliant on the -// existence of this field in 7.16, so the following migration was added to -// support upgrade from 7.14. -// -// It is currently safe to run this in the background; albeit with some -// concern on conflicts. The conflict risk exists regardless as N Fleet Servers -// can be run in parallel at the same time. -// -// As the update only occurs once, the 99.9% case is a noop. -func migrateAgentMetadata(ctx context.Context, bulker bulk.Bulk) error { + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dsl" +) - root := dsl.NewRoot() - root.Query().Bool().MustNot().Exists("agent.id") +type ( + migrationFn func(context.Context, bulk.Bulk) error + migrationBodyFn func() (string, string, []byte, error) + migrationResponse struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Total int `json:"total"` + Updated int `json:"updated"` + Deleted int `json:"deleted"` + Batches int `json:"batches"` + VersionConflicts int `json:"version_conflicts"` + Noops int `json:"noops"` + Retries struct { + Bulk int `json:"bulk"` + Search int `json:"search"` + } `json:"retries"` + Failures []json.RawMessage `json:"failures"` + } +) - painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" - root.Param("script", painless) +// timeNow is used to get the current time. It should be replaced for testing. +var timeNow = time.Now - body, err := root.MarshalJSON() - if err != nil { - return err +// Migrate applies, in sequence, the migration functions. Currently, each migration +// function is responsible to ensure it only applies the migration if needed, +// being a no-op otherwise. +func Migrate(ctx context.Context, bulker bulk.Bulk) error { + for _, fn := range []migrationFn{migrateTov7_15, migrateToV8_5} { + if err := fn(ctx, bulker); err != nil { + return err + } } -LOOP: + return nil +} + +func migrate(ctx context.Context, bulker bulk.Bulk, fn migrationBodyFn) (int, error) { + var updatedDocs int for { - nConflicts, err := updateAgentMetadata(ctx, bulker, body) + name, index, body, err := fn() if err != nil { - return err - } - if nConflicts == 0 { - break LOOP + return updatedDocs, fmt.Errorf(": %w", err) } - time.Sleep(time.Second) + resp, err := applyMigration(ctx, name, index, bulker, body) + if err != nil { + return updatedDocs, fmt.Errorf("failed to apply migration %q: %w", + name, err) + } + updatedDocs += resp.Updated + if resp.VersionConflicts == 0 { + break + } } - return nil + return updatedDocs, nil } -func updateAgentMetadata(ctx context.Context, bulker bulk.Bulk, body []byte) (int, error) { +func applyMigration(ctx context.Context, name string, index string, bulker bulk.Bulk, body []byte) (migrationResponse, error) { start := time.Now() client := bulker.Client() @@ -78,59 +92,193 @@ func updateAgentMetadata(ctx context.Context, bulker bulk.Bulk, body []byte) (in client.UpdateByQuery.WithConflicts("proceed"), } - res, err := client.UpdateByQuery([]string{FleetAgents}, opts...) - + res, err := client.UpdateByQuery([]string{index}, opts...) if err != nil { - return 0, err + return migrationResponse{}, err } if res.IsError() { if res.StatusCode == http.StatusNotFound { // Ignore index not created yet; nothing to upgrade - return 0, nil + return migrationResponse{}, nil } - return 0, fmt.Errorf("Migrate UpdateByQuery %s", res.String()) + return migrationResponse{}, fmt.Errorf("migrate %s UpdateByQuery failed: %s", + name, res.String()) } - resp := struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - Total int `json:"total"` - Updated int `json:"updated"` - Deleted int `json:"deleted"` - Batches int `json:"batches"` - VersionConflicts int `json:"version_conflicts"` - Noops int `json:"noops"` - Retries struct { - Bulk int `json:"bulk"` - Search int `json:"search"` - } `json:"retries"` - Failures []json.RawMessage `json:"failures"` - }{} + resp := migrationResponse{} decoder := json.NewDecoder(res.Body) if err := decoder.Decode(&resp); err != nil { - return 0, errors.Wrap(err, "decode UpdateByQuery response") + return migrationResponse{}, errors.Wrap(err, "decode UpdateByQuery response") } log.Info(). - Int("took", resp.Took). - Bool("timed_out", resp.TimedOut). - Int("total", resp.Total). - Int("updated", resp.Updated). - Int("deleted", resp.Deleted). - Int("batches", resp.Batches). - Int("version_conflicts", resp.VersionConflicts). - Int("noops", resp.Noops). - Int("retries.bulk", resp.Retries.Bulk). - Int("retries.search", resp.Retries.Search). - Dur("rtt", time.Since(start)). - Msg("migrate agent records response") + Str("fleet.migration.name", name). + Int("fleet.migration.es.took", resp.Took). + Bool("fleet.migration.es.timed_out", resp.TimedOut). + Int("fleet.migration.total", resp.Total). + Int("fleet.migration.updated", resp.Updated). + Int("fleet.migration.deleted", resp.Deleted). + Int("fleet.migration.batches", resp.Batches). + Int("fleet.migration.version_conflicts", resp.VersionConflicts). + Int("fleet.migration.noops", resp.Noops). + Int("fleet.migration.retries.bulk", resp.Retries.Bulk). + Int("fleet.migration.retries.search", resp.Retries.Search). + Dur("fleet.migration.total.duration", time.Since(start)). + Msgf("migration %s done", name) for _, fail := range resp.Failures { - log.Error().RawJSON("failure", fail).Msg("migration failure") + log.Error().RawJSON("failure", fail).Msgf("failed applying %s migration", name) + } + + return resp, err +} + +// ============================== V7.15 migration ============================== +func migrateTov7_15(ctx context.Context, bulker bulk.Bulk) error { + log.Debug().Msg("applying migration to v7.15") + _, err := migrate(ctx, bulker, migrateAgentMetadata) + if err != nil { + return fmt.Errorf("v7.15.0 data migration failed: %w", err) + } + + return nil +} + +// FleetServer 7.15 added a new *AgentMetadata field to the Agent record. +// This field was populated in new enrollments in 7.15 and later; however, the +// change was not backported to support 7.14. The security team is reliant on the +// existence of this field in 7.16, so the following migration was added to +// support upgrade from 7.14. +// +// It is currently safe to run this in the background; albeit with some +// concern on conflicts. The conflict risk exists regardless as N Fleet Servers +// can be run in parallel at the same time. +// +// As the update only occurs once, the 99.9% case is a noop. +func migrateAgentMetadata() (string, string, []byte, error) { + const migrationName = "AgentMetadata" + query := dsl.NewRoot() + query.Query().Bool().MustNot().Exists("agent.id") + + painless := "ctx._source.agent = [:]; ctx._source.agent.id = ctx._id;" + query.Param("script", painless) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) + } + + return migrationName, FleetAgents, body, nil +} + +// ============================== V8.5.0 migration ============================= +// https://github.com/elastic/fleet-server/issues/1672 + +func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { + log.Debug().Msg("applying migration to v8.5.0") + migrated, err := migrate(ctx, bulker, migrateAgentOutputs) + if err != nil { + return fmt.Errorf("v8.5.0 data migration failed: %w", err) + } + + // The migration was necessary and indeed run, thus we need to regenerate + // the API keys for all agents. In order to do so, we increase the policy + // coordinator index to force a policy update. + if migrated > 0 { + _, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + if err != nil { + return fmt.Errorf("v8.5.0 data migration failed: %w", err) + } + } + + return nil +} + +// migrateAgentOutputs performs the necessary changes on the Agent documents +// to introduce the `Outputs` field. +// +// FleetServer 8.5.0 introduces a new field to the Agent document, Outputs, to +// store the outputs credentials and data. The DefaultAPIKey, DefaultAPIKeyID, +// DefaultAPIKeyHistory and PolicyOutputPermissionsHash are now deprecated in +// favour of the new `Outputs` fields, which maps the output name to its data. +// This change fixes https://github.com/elastic/fleet-server/issues/1672. +// +// The change is backward compatible as the deprecated fields are just set to +// their zero value and an older version of FleetServer can repopulate them. +// However, reverting FleetServer to an older version might cause very issue +// this change fixes. +func migrateAgentOutputs() (string, string, []byte, error) { + const ( + migrationName = "AgentOutputs" + fieldOutputs = "outputs" + fieldRetiredAt = "retiredAt" + ) + + query := dsl.NewRoot() + query.Query().Bool().MustNot().Exists(fieldOutputs) + + fields := map[string]interface{}{fieldRetiredAt: timeNow().UTC().Format(time.RFC3339)} + painless := ` +// set up the new fields +ctx._source['` + fieldOutputs + `']=new HashMap(); +ctx._source['` + fieldOutputs + `']['default']=new HashMap(); +ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=new ArrayList(); + +// copy 'default_api_key_history' to new 'outputs' field +ctx._source['` + fieldOutputs + `']['default'].type="elasticsearch"; +if (ctx._source.default_api_key_history != null && ctx._source.default_api_key_history.length > 0) { + ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids=ctx._source.default_api_key_history; +} + +Map map = new HashMap(); +map.put("retired_at", params.` + fieldRetiredAt + `); +map.put("id", ctx._source.default_api_key_id); + +// Make current API key empty, so fleet-server will generate a new one +// Add current API jey to be retired +ctx._source['` + fieldOutputs + `']['default'].to_retire_api_key_ids.add(map); +ctx._source['` + fieldOutputs + `']['default'].api_key=""; +ctx._source['` + fieldOutputs + `']['default'].api_key_id=""; +ctx._source['` + fieldOutputs + `']['default'].permissions_hash=ctx._source.policy_output_permissions_hash; + +// Erase deprecated fields +ctx._source.default_api_key_history=null; +ctx._source.default_api_key=""; +ctx._source.default_api_key_id=""; +ctx._source.policy_output_permissions_hash=""; +` + query.Param("script", map[string]interface{}{ + "lang": "painless", + "source": painless, + "params": fields, + }) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetAgents, nil, fmt.Errorf("could not marshal ES query: %w", err) + } + + return migrationName, FleetAgents, body, nil +} + +// migratePolicyCoordinatorIdx increases the policy's CoordinatorIdx to force +// a policy update ensuring the output data will be migrated to the new +// Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 +// for details. +func migratePolicyCoordinatorIdx() (string, string, []byte, error) { + const migrationName = "PolicyCoordinatorIdx" + + query := dsl.NewRoot() + query.Query().MatchAll() + query.Param("script", `ctx._source.coordinator_idx++;`) + + body, err := query.MarshalJSON() + if err != nil { + return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err) } - return resp.VersionConflicts, err + return migrationName, FleetPolicies, body, nil } diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go new file mode 100644 index 000000000..183502b94 --- /dev/null +++ b/internal/pkg/dl/migration_integration_test.go @@ -0,0 +1,295 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package dl + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" +) + +const nowStr = "2022-08-12T16:50:05Z" + +func createSomeAgents(t *testing.T, n int, apiKey bulk.APIKey, index string, bulker bulk.Bulk) []string { + t.Helper() + + var createdAgents []string + + for i := 0; i < n; i++ { + outputAPIKey := bulk.APIKey{ + ID: fmt.Sprint(apiKey.ID, i), + Key: fmt.Sprint(apiKey.Key, i), + } + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + DefaultAPIKeyID: outputAPIKey.ID, + DefaultAPIKey: outputAPIKey.Agent(), + PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), + DefaultAPIKeyHistory: []model.ToRetireAPIKeyIdsItems{ + { + ID: "old_" + outputAPIKey.ID, + RetiredAt: nowStr, + }, + }, + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + createdAgents = append(createdAgents, agentID) + } + + return createdAgents +} + +func createSomePolicies(t *testing.T, n int, index string, bulker bulk.Bulk) []string { + t.Helper() + + var created []string + + for i := 0; i < n; i++ { + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + policyModel := model.Policy{ + ESDocument: model.ESDocument{}, + CoordinatorIdx: int64(i), + Data: nil, + DefaultFleetServer: false, + PolicyID: fmt.Sprint(i), + RevisionIdx: 1, + Timestamp: nowStr, + UnenrollTimeout: 0, + } + + body, err := json.Marshal(policyModel) + require.NoError(t, err) + + policyDocID, err := bulker.Create( + context.Background(), index, "", body, bulk.WithRefresh()) + require.NoError(t, err) + + created = append(created, policyDocID) + } + + return created +} + +func TestPolicyCoordinatorIdx(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetPolicies) + + docIDs := createSomePolicies(t, 25, index, bulker) + + migrated, err := migrate(context.Background(), bulker, migratePolicyCoordinatorIdx) + require.NoError(t, err) + + require.Equal(t, len(docIDs), migrated) + + for i := range docIDs { + policies, err := QueryLatestPolicies( + context.Background(), bulker, WithIndexName(index)) + if err != nil { + assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails + continue + } + + var got model.Policy + for _, p := range policies { + if p.PolicyID == fmt.Sprint(i) { + got = p + } + } + + assert.Equal(t, int64(i+1), got.CoordinatorIdx) + } +} + +func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + apiKey := bulk.APIKey{ + ID: "testAgent_", + Key: "testAgent_key_", + } + + agentIDs := createSomeAgents(t, 25, apiKey, index, bulker) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + assert.Equal(t, len(agentIDs), migratedAgents) + + for i, id := range agentIDs { + wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication + + got, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, id, WithIndexName(index)) + if err != nil { + assert.NoError(t, err, "failed to find agent ID %q", id) // we want to continue even if a single agent fails + continue + } + + wantToRetireAPIKeyIds := []model.ToRetireAPIKeyIdsItems{ + { + // Current API should be marked to retire after the migration + ID: fmt.Sprintf("%s%d", apiKey.ID, i), + RetiredAt: timeNow().UTC().Format(time.RFC3339)}, + { + ID: fmt.Sprintf("old_%s%d", apiKey.ID, i), + RetiredAt: nowStr}, + } + + // Assert new fields + require.Len(t, got.Outputs, 1) + // Default API key is empty to force fleet-server to regenerate them. + assert.Empty(t, got.Outputs["default"].APIKey) + assert.Empty(t, got.Outputs["default"].APIKeyID) + + assert.Equal(t, wantOutputType, got.Outputs["default"].Type) + assert.Equal(t, + fmt.Sprint("a_output_permission_SHA_", i), + got.Outputs["default"].PermissionsHash) + + // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. + for _, want := range wantToRetireAPIKeyIds { + var found bool + for _, got := range got.Outputs["default"].ToRetireAPIKeyIds { + found = found || cmp.Equal(want, got) + } + if !found { + t.Errorf("could not find %#v, in %#v", + want, got.Outputs["default"].ToRetireAPIKeyIds) + } + } + + // Assert deprecated fields + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.PolicyOutputPermissionsHash) + assert.Nil(t, got.DefaultAPIKeyHistory) + } +} + +func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { + wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication + + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + apiKey := bulk.APIKey{ + ID: "testAgent_", + Key: "testAgent_key_", + } + + i := 0 + outputAPIKey := bulk.APIKey{ + ID: fmt.Sprint(apiKey.ID, i), + Key: fmt.Sprint(apiKey.Key, i), + } + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + DefaultAPIKeyID: outputAPIKey.ID, + DefaultAPIKey: outputAPIKey.Agent(), + PolicyOutputPermissionsHash: fmt.Sprint("a_output_permission_SHA_", i), + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + got, err := FindAgent( + context.Background(), bulker, QueryAgentByID, FieldID, agentID, WithIndexName(index)) + require.NoError(t, err, "failed to find agent ID %q", agentID) // we want to continue even if a single agent fails + + assert.Equal(t, 1, migratedAgents) + + // Assert new fields + require.Len(t, got.Outputs, 1) + // Default API key is empty to force fleet-server to regenerate them. + assert.Empty(t, got.Outputs["default"].APIKey) + assert.Empty(t, got.Outputs["default"].APIKeyID) + assert.Equal(t, wantOutputType, got.Outputs["default"].Type) + assert.Equal(t, + fmt.Sprint("a_output_permission_SHA_", i), + got.Outputs["default"].PermissionsHash) + + // Assert ToRetireAPIKeyIds contains the expected values, regardless of the order. + if assert.Len(t, got.Outputs["default"].ToRetireAPIKeyIds, 1) { + assert.Equal(t, + model.ToRetireAPIKeyIdsItems{ID: outputAPIKey.ID, RetiredAt: nowStr}, + got.Outputs["default"].ToRetireAPIKeyIds[0]) + } + + // Assert deprecated fields + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.DefaultAPIKey) + assert.Empty(t, got.PolicyOutputPermissionsHash) + assert.Nil(t, got.DefaultAPIKeyHistory) +} + +func TestMigrateOutputs_no_agent_document(t *testing.T) { + now, err := time.Parse(time.RFC3339, nowStr) + require.NoError(t, err, "could not parse time "+nowStr) + timeNow = func() time.Time { + return now + } + + _, bulker := ftesting.SetupCleanIndex(context.Background(), t, FleetAgents) + + migratedAgents, err := migrate(context.Background(), bulker, migrateAgentOutputs) + require.NoError(t, err) + + assert.Equal(t, 0, migratedAgents) +} diff --git a/internal/pkg/es/error.go b/internal/pkg/es/error.go index 79b07499c..a5e575df5 100644 --- a/internal/pkg/es/error.go +++ b/internal/pkg/es/error.go @@ -37,17 +37,25 @@ func (e ErrElastic) Error() string { // Otherwise were getting: "elastic fail 404::" msg := "elastic fail " var b strings.Builder - b.Grow(len(msg) + 5 + len(e.Type) + len(e.Reason)) + b.Grow(len(msg) + 11 + len(e.Type) + len(e.Reason) + len(e.Cause.Type) + len(e.Cause.Reason)) b.WriteString(msg) b.WriteString(strconv.Itoa(e.Status)) if e.Type != "" { - b.WriteString(":") + b.WriteString(": ") b.WriteString(e.Type) } if e.Reason != "" { - b.WriteString(":") + b.WriteString(": ") b.WriteString(e.Reason) } + if e.Cause.Type != "" { + b.WriteString(": ") + b.WriteString(e.Cause.Type) + } + if e.Cause.Reason != "" { + b.WriteString(": ") + b.WriteString(e.Cause.Reason) + } return b.String() } @@ -83,8 +91,8 @@ func TranslateError(status int, e *ErrorT) error { Type string Reason string }{ - e.Cause.Type, - e.Cause.Reason, + Type: e.Cause.Type, + Reason: e.Cause.Reason, }, } } diff --git a/internal/pkg/model/ext.go b/internal/pkg/model/ext.go index d89787855..4a11bbe08 100644 --- a/internal/pkg/model/ext.go +++ b/internal/pkg/model/ext.go @@ -27,14 +27,36 @@ func (m *Server) SetTime(t time.Time) { } // CheckDifferentVersion returns Agent version if it is different from ver, otherwise return empty string -func (m *Agent) CheckDifferentVersion(ver string) string { - if m == nil { +func (a *Agent) CheckDifferentVersion(ver string) string { + if a == nil { return "" } - if m.Agent == nil || ver != m.Agent.Version { + if a.Agent == nil || ver != a.Agent.Version { return ver } return "" } + +// APIKeyIDs returns all the API keys, the valid, in-use as well as the one +// marked to be retired. +func (a *Agent) APIKeyIDs() []string { + if a == nil { + return nil + } + keys := make([]string, 0, len(a.Outputs)+1) + if a.AccessAPIKeyID != "" { + keys = append(keys, a.AccessAPIKeyID) + } + + for _, output := range a.Outputs { + keys = append(keys, output.APIKeyID) + for _, key := range output.ToRetireAPIKeyIds { + keys = append(keys, key.ID) + } + } + + return keys + +} diff --git a/internal/pkg/model/ext_test.go b/internal/pkg/model/ext_test.go index e48194b30..527570270 100644 --- a/internal/pkg/model/ext_test.go +++ b/internal/pkg/model/ext_test.go @@ -2,15 +2,13 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build !integration -// +build !integration - package model import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" ) func TestAgentGetNewVersion(t *testing.T) { @@ -85,3 +83,54 @@ func TestAgentGetNewVersion(t *testing.T) { }) } } + +func TestAgentAPIKeyIDs(t *testing.T) { + tcs := []struct { + name string + agent Agent + want []string + }{ + { + name: "no API key marked to be retired", + agent: Agent{ + AccessAPIKeyID: "access_api_key_id", + Outputs: map[string]*PolicyOutput{ + "p1": {APIKeyID: "p1_api_key_id"}, + "p2": {APIKeyID: "p2_api_key_id"}, + }, + }, + want: []string{"access_api_key_id", "p1_api_key_id", "p2_api_key_id"}, + }, + { + name: "with API key marked to be retired", + agent: Agent{ + AccessAPIKeyID: "access_api_key_id", + Outputs: map[string]*PolicyOutput{ + "p1": { + APIKeyID: "p1_api_key_id", + ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ + ID: "p1_to_retire_key", + }}}, + "p2": { + APIKeyID: "p2_api_key_id", + ToRetireAPIKeyIds: []ToRetireAPIKeyIdsItems{{ + ID: "p2_to_retire_key", + }}}, + }, + }, + want: []string{ + "access_api_key_id", "p1_api_key_id", "p2_api_key_id", + "p1_to_retire_key", "p2_to_retire_key"}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.agent.APIKeyIDs() + + // if A contains B and B contains A => A = B + assert.Subset(t, tc.want, got) + assert.Subset(t, got, tc.want) + }) + } +} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 9fb0d07fc..5d3c844f1 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -124,13 +124,13 @@ type Agent struct { Active bool `json:"active"` Agent *AgentMetadata `json:"agent,omitempty"` - // API key the Elastic Agent uses to authenticate with elasticsearch + // Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKey string `json:"default_api_key,omitempty"` - // Default API Key History - DefaultAPIKeyHistory []DefaultAPIKeyHistoryItems `json:"default_api_key_history,omitempty"` + // Deprecated. Use Outputs instead. Default API Key History + DefaultAPIKeyHistory []ToRetireAPIKeyIdsItems `json:"default_api_key_history,omitempty"` - // ID of the API key the Elastic Agent uses to authenticate with elasticsearch + // Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKeyID string `json:"default_api_key_id,omitempty"` // Date/time the Elastic Agent enrolled @@ -148,6 +148,9 @@ type Agent struct { // Local metadata information for the Elastic Agent LocalMetadata json.RawMessage `json:"local_metadata,omitempty"` + // Outputs is the policy output data, mapping the output name to its data + Outputs map[string]*PolicyOutput `json:"outputs,omitempty"` + // Packages array Packages []string `json:"packages,omitempty"` @@ -157,7 +160,7 @@ type Agent struct { // The policy ID for the Elastic Agent PolicyID string `json:"policy_id,omitempty"` - // The policy output permissions hash + // Deprecated. Use Outputs instead. The policy output permissions hash PolicyOutputPermissionsHash string `json:"policy_output_permissions_hash,omitempty"` // The current policy revision_idx for the Elastic Agent @@ -250,16 +253,6 @@ type Body struct { type Data struct { } -// DefaultAPIKeyHistoryItems -type DefaultAPIKeyHistoryItems struct { - - // API Key identifier - ID string `json:"id,omitempty"` - - // Date/time the API key was retired - RetiredAt string `json:"retired_at,omitempty"` -} - // EnrollmentAPIKey An Elastic Agent enrollment API key type EnrollmentAPIKey struct { ESDocument @@ -336,6 +329,26 @@ type PolicyLeader struct { Timestamp string `json:"@timestamp,omitempty"` } +// PolicyOutput holds the needed data to manage the output API keys +type PolicyOutput struct { + ESDocument + + // API key the Elastic Agent uses to authenticate with elasticsearch + APIKey string `json:"api_key"` + + // ID of the API key the Elastic Agent uses to authenticate with elasticsearch + APIKeyID string `json:"api_key_id"` + + // The policy output permissions hash + PermissionsHash string `json:"permissions_hash"` + + // API keys to be invalidated on next agent ack + ToRetireAPIKeyIds []ToRetireAPIKeyIdsItems `json:"to_retire_api_key_ids,omitempty"` + + // Type is the output type. Currently only Elasticsearch is supported. + Type string `json:"type"` +} + // Server A Fleet Server type Server struct { ESDocument @@ -357,6 +370,16 @@ type ServerMetadata struct { Version string `json:"version"` } +// ToRetireAPIKeyIdsItems the Output API Keys that were replaced and should be retired +type ToRetireAPIKeyIdsItems struct { + + // API Key identifier + ID string `json:"id,omitempty"` + + // Date/time the API key was retired + RetiredAt string `json:"retired_at,omitempty"` +} + // UserProvidedMetadata User provided metadata information for the Elastic Agent type UserProvidedMetadata struct { } diff --git a/internal/pkg/policy/parsed_policy.go b/internal/pkg/policy/parsed_policy.go index dbf5d3801..029298ef5 100644 --- a/internal/pkg/policy/parsed_policy.go +++ b/internal/pkg/policy/parsed_policy.go @@ -42,7 +42,7 @@ type ParsedPolicy struct { Policy model.Policy Fields map[string]json.RawMessage Roles RoleMapT - Outputs map[string]PolicyOutput + Outputs map[string]Output Default ParsedPolicyDefaults } @@ -91,8 +91,8 @@ func NewParsedPolicy(p model.Policy) (*ParsedPolicy, error) { return pp, nil } -func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]PolicyOutput, error) { - result := make(map[string]PolicyOutput) +func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]Output, error) { + result := make(map[string]Output) outputsMap, err := smap.Parse(outputsRaw) if err != nil { @@ -102,7 +102,7 @@ func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) for k := range outputsMap { v := outputsMap.GetMap(k) - p := PolicyOutput{ + p := Output{ Name: k, Type: v.GetString(FieldOutputType), } @@ -126,13 +126,13 @@ func parsePerms(permsRaw json.RawMessage) (RoleMapT, error) { // iterate across the keys m := make(RoleMapT, len(permMap)) for k := range permMap { - v := permMap.GetMap(k) if v != nil { var r RoleT // Stable hash on permissions payload + // permission hash created here if r.Sha2, err = v.Hash(); err != nil { return nil, err } diff --git a/internal/pkg/policy/parsed_policy_test.go b/internal/pkg/policy/parsed_policy_test.go index 547cfcf7a..32ef271a7 100644 --- a/internal/pkg/policy/parsed_policy_test.go +++ b/internal/pkg/policy/parsed_policy_test.go @@ -13,7 +13,6 @@ import ( ) func TestNewParsedPolicy(t *testing.T) { - // Run two formatting of the same payload to validate that the sha2 remains the same payloads := []string{ testPolicy, diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 8115d22ec..c2728aa1e 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -32,118 +32,173 @@ var ( ErrFailInjectAPIKey = errors.New("fail inject api key") ) -type PolicyOutput struct { +type Output struct { Name string Type string Role *RoleT } -func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { +// Prepare prepares the output p to be sent to the elastic-agent +// The agent might be mutated for an elasticsearch output +func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { + zlog = zlog.With(). + Str("fleet.agent.id", agent.Id). + Str("fleet.policy.output.name", p.Name).Logger() + switch p.Type { case OutputTypeElasticsearch: zlog.Debug().Msg("preparing elasticsearch output") - - // The role is required to do api key management - if p.Role == nil { - zlog.Error().Str("name", p.Name).Msg("policy does not contain required output permission section") - return ErrNoOutputPerms + if err := p.prepareElasticsearch(ctx, zlog, bulker, agent, outputMap); err != nil { + return fmt.Errorf("failed to prepare elasticsearch output %q: %w", p.Name, err) } + case OutputTypeLogstash: + zlog.Debug().Msg("preparing logstash output") + zlog.Info().Msg("no actions required for logstash output preparation") + default: + zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) + return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) + } + return nil +} + +func (p *Output) prepareElasticsearch( + ctx context.Context, + zlog zerolog.Logger, + bulker bulk.Bulk, + agent *model.Agent, + outputMap smap.Map) error { + // The role is required to do api key management + if p.Role == nil { + zlog.Error(). + Msg("policy does not contain required output permission section") + return ErrNoOutputPerms + } - // Determine whether we need to generate an output ApiKey. - // This is accomplished by comparing the sha2 hash stored in the agent - // record with the precalculated sha2 hash of the role. - - // Note: This will need to be updated when doing multi-cluster elasticsearch support - // Currently, we only have access to the token for the elasticsearch instance fleet-server - // is monitors. When updating for multiple ES instances we need to tie the token to the output. - needNewKey := true - switch { - case agent.DefaultAPIKey == "": - zlog.Debug().Msg("must generate api key as default API key is not present") - case p.Role.Sha2 != agent.PolicyOutputPermissionsHash: - zlog.Debug().Msg("must generate api key as policy output permissions changed") - default: - needNewKey = false - zlog.Debug().Msg("policy output permissions are the same") + output, ok := agent.Outputs[p.Name] + if !ok { + if agent.Outputs == nil { + agent.Outputs = map[string]*model.PolicyOutput{} } - if needNewKey { - zlog.Debug(). - RawJSON("roles", p.Role.Raw). - Str("oldHash", agent.PolicyOutputPermissionsHash). - Str("newHash", p.Role.Sha2). - Msg("Generating a new API key") - - outputAPIKey, err := generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) - if err != nil { - zlog.Error().Err(err).Msg("fail generate output key") - return err - } + zlog.Debug().Msgf("creating agent.Outputs[%s]", p.Name) + output = &model.PolicyOutput{} + agent.Outputs[p.Name] = output + } - agent.DefaultAPIKey = outputAPIKey.Agent() + // Determine whether we need to generate an output ApiKey. + // This is accomplished by comparing the sha2 hash stored in the corresponding + // output in the agent record with the precalculated sha2 hash of the role. - // When a new keys is generated we need to update the Agent record, - // this will need to be updated when multiples Elasticsearch output - // are used. - zlog.Info(). - Str("hash.sha256", p.Role.Sha2). - Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). - Msg("Updating agent record to pick up default output key.") + // Note: This will need to be updated when doing multi-cluster elasticsearch support + // Currently, we assume all ES outputs are the same ES fleet-server is connected to. + needNewKey := true + switch { + case output.APIKey == "": + zlog.Debug().Msg("must generate api key as default API key is not present") + case p.Role.Sha2 != output.PermissionsHash: + // the is actually the OutputPermissionsHash for the default hash. The Agent + // document on ES does not have OutputPermissionsHash for any other output + // besides the default one. It seems to me error-prone to rely on the default + // output permissions hash to generate new API keys for other outputs. + zlog.Debug().Msg("must generate api key as policy output permissions changed") + default: + needNewKey = false + zlog.Debug().Msg("policy output permissions are the same") + } - fields := map[string]interface{}{ - dl.FieldDefaultAPIKey: outputAPIKey.Agent(), - dl.FieldDefaultAPIKeyID: outputAPIKey.ID, - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } - if agent.DefaultAPIKeyID != "" { - fields[dl.FieldDefaultAPIKeyHistory] = model.DefaultAPIKeyHistoryItems{ - ID: agent.DefaultAPIKeyID, - RetiredAt: time.Now().UTC().Format(time.RFC3339), - } - } + if needNewKey { + zlog.Debug(). + RawJSON("fleet.policy.roles", p.Role.Raw). + Str("fleet.policy.default.oldHash", output.PermissionsHash). + Str("fleet.policy.default.newHash", p.Role.Sha2). + Msg("Generating a new API key") - // Using painless script to append the old keys to the history - body, err := renderUpdatePainlessScript(fields) + ctx := zlog.WithContext(ctx) + outputAPIKey, err := + generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) + if err != nil { + return fmt.Errorf("failed generate output API key: %w", err) + } - if err != nil { - return err - } + output.Type = OutputTypeElasticsearch + output.APIKey = outputAPIKey.Agent() + output.APIKeyID = outputAPIKey.ID + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + + // When a new keys is generated we need to update the Agent record, + // this will need to be updated when multiples remote Elasticsearch output + // are supported. + zlog.Info(). + Str("fleet.policy.role.hash.sha256", p.Role.Sha2). + Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). + Msg("Updating agent record to pick up default output key.") - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return err + fields := map[string]interface{}{ + dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), + dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + if output.APIKeyID != "" { + fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ + ID: output.APIKeyID, + RetiredAt: time.Now().UTC().Format(time.RFC3339), } } - // Always insert the `api_key` as part of the output block, this is required - // because only fleet server knows the api key for the specific agent, if we don't - // add it the agent will not receive the `api_key` and will not be able to connect - // to Elasticsearch. - // - // We need to investigate allocation with the new LS output, we had optimization - // in place to reduce number of agent policy allocation when sending the updated - // agent policy to multiple agents. - // See: https://github.com/elastic/fleet-server/issues/1301 - if ok := setMapObj(outputMap, agent.DefaultAPIKey, p.Name, "api_key"); !ok { - return ErrFailInjectAPIKey + // Using painless script to append the old keys to the history + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return fmt.Errorf("could no tupdate painless script: %w", err) } - case OutputTypeLogstash: - zlog.Debug().Msg("preparing logstash output") - zlog.Info().Msg("no actions required for logstash output preparation") - default: - zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) - return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return fmt.Errorf("fail update agent record: %w", err) + } + } + + // Always insert the `api_key` as part of the output block, this is required + // because only fleet server knows the api key for the specific agent, if we don't + // add it the agent will not receive the `api_key` and will not be able to connect + // to Elasticsearch. + // + // We need to investigate allocation with the new LS output, we had optimization + // in place to reduce number of agent policy allocation when sending the updated + // agent policy to multiple agents. + // See: https://github.com/elastic/fleet-server/issues/1301 + if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { + return err } + return nil } -func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) { +func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder + + // prepare agent.elasticsearch_outputs[OUTPUT_NAME] + source.WriteString(fmt.Sprintf(` +if (ctx._source['outputs']==null) + {ctx._source['outputs']=new HashMap();} +if (ctx._source['outputs']['%s']==null) + {ctx._source['outputs']['%s']=new HashMap();} +`, outputName, outputName)) + for field := range fields { - if field == dl.FieldDefaultAPIKeyHistory { - source.WriteString(fmt.Sprint("if (ctx._source.", field, "==null) {ctx._source.", field, "=new ArrayList();} ctx._source.", field, ".add(params.", field, ");")) + if field == dl.FieldPolicyOutputToRetireAPIKeyIDs { + // dl.FieldPolicyOutputToRetireAPIKeyIDs is a special case. + // It's an array that gets deleted when the keys are invalidated. + // Thus, append the old API key ID, create the field if necessary. + source.WriteString(fmt.Sprintf(` +if (ctx._source['outputs']['%s'].%s==null) + {ctx._source['outputs']['%s'].%s=new ArrayList();} +ctx._source['outputs']['%s'].%s.add(params.%s); +`, outputName, field, outputName, field, outputName, field, field)) } else { - source.WriteString(fmt.Sprint("ctx._source.", field, "=", "params.", field, ";")) + // Update the other fields + source.WriteString(fmt.Sprintf(` +ctx._source['outputs']['%s'].%s=params.%s;`, + outputName, field, field)) } } @@ -158,36 +213,45 @@ func renderUpdatePainlessScript(fields map[string]interface{}) ([]byte, error) { return body, err } -func generateOutputAPIKey(ctx context.Context, bulk bulk.Bulk, agentID, outputName string, roles []byte) (*apikey.APIKey, error) { +func generateOutputAPIKey( + ctx context.Context, + bulk bulk.Bulk, + agentID, + outputName string, + roles []byte) (*apikey.APIKey, error) { name := fmt.Sprintf("%s:%s", agentID, outputName) + zerolog.Ctx(ctx).Info().Msgf("generating output API key %s for agent ID %s", + name, agentID) return bulk.APIKeyCreate( ctx, name, "", roles, - apikey.NewMetadata(agentID, apikey.TypeOutput), + apikey.NewMetadata(agentID, outputName, apikey.TypeOutput), ) } -func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) bool { +func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) error { if len(keys) == 0 { - return false + return fmt.Errorf("no key to be updated: %w", ErrFailInjectAPIKey) } for _, k := range keys[:len(keys)-1] { v, ok := obj[k] if !ok { - return false + return fmt.Errorf("no key %q not present on MapObj: %w", + k, ErrFailInjectAPIKey) } obj, ok = v.(map[string]interface{}) if !ok { - return false + return fmt.Errorf("cannot cast %T to map[string]interface{}: %w", + obj, ErrFailInjectAPIKey) } } k := keys[len(keys)-1] obj[k] = val - return true + return nil } diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go new file mode 100644 index 000000000..6acd0d9fa --- /dev/null +++ b/internal/pkg/policy/policy_output_integration_test.go @@ -0,0 +1,127 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package policy + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" +) + +func TestRenderUpdatePainlessScript(t *testing.T) { + tts := []struct { + name string + + existingToRetireAPIKeyIds []model.ToRetireAPIKeyIdsItems + }{ + { + name: "to_retire_api_key_ids is empty", + }, + { + name: "to_retire_api_key_ids is not empty", + existingToRetireAPIKeyIds: []model.ToRetireAPIKeyIdsItems{{ + ID: "pre_existing_ID", RetiredAt: "pre_existing__RetiredAt"}}, + }, + } + + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + outputPermissionSha := "new_permissionSHA_" + tt.name + outputName := "output_" + tt.name + outputAPIKey := bulk.APIKey{ID: "new_ID", Key: "new-key"} + + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + + now := time.Now().UTC() + nowStr := now.Format(time.RFC3339) + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + previousAPIKey := bulk.APIKey{ + ID: "old_" + outputAPIKey.ID, + Key: "old_" + outputAPIKey.Key, + } + + wantOutputs := map[string]*model.PolicyOutput{ + outputName: { + APIKey: outputAPIKey.Agent(), + APIKeyID: outputAPIKey.ID, + PermissionsHash: outputPermissionSha, + Type: OutputTypeElasticsearch, + ToRetireAPIKeyIds: append(tt.existingToRetireAPIKeyIds, + model.ToRetireAPIKeyIdsItems{ + ID: previousAPIKey.ID, RetiredAt: nowStr}), + }, + } + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + Outputs: map[string]*model.PolicyOutput{ + outputName: { + Type: OutputTypeElasticsearch, + APIKey: previousAPIKey.Agent(), + APIKeyID: previousAPIKey.ID, + PermissionsHash: "old_" + outputPermissionSha, + }, + }, + } + if tt.existingToRetireAPIKeyIds != nil { + agentModel.Outputs[outputName].ToRetireAPIKeyIds = + tt.existingToRetireAPIKeyIds + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + fields := map[string]interface{}{ + dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), + dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: outputPermissionSha, + dl.FieldPolicyOutputToRetireAPIKeyIDs: model.ToRetireAPIKeyIdsItems{ + ID: previousAPIKey.ID, RetiredAt: nowStr}, + } + + got, err := renderUpdatePainlessScript(outputName, fields) + require.NoError(t, err, "renderUpdatePainlessScript returned an unexpected error") + + err = bulker.Update(context.Background(), dl.FleetAgents, agentID, got) + require.NoError(t, err, "bulker.Update failed") + + // there is some refresh thing that needs time, I didn't manage to find + // how ot fix it at the requests to ES level, thus this timeout here. + time.Sleep(time.Second) + + gotAgent, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + require.NoError(t, err) + + assert.Equal(t, agentID, gotAgent.Id) + assert.Len(t, gotAgent.Outputs, len(wantOutputs)) + assert.Equal(t, wantOutputs, gotAgent.Outputs) + }) + } +} diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index 1e90cee57..d66275d04 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -8,6 +8,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -23,7 +24,7 @@ var TestPayload []byte func TestPolicyLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -39,7 +40,7 @@ func TestPolicyLogstashOutputPrepare(t *testing.T) { func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: nil, @@ -54,7 +55,7 @@ func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeLogstash, Name: "test output", Role: &RoleT{ @@ -71,7 +72,7 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { func TestPolicyESOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - po := PolicyOutput{ + po := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: nil, @@ -86,8 +87,11 @@ func TestPolicyOutputESPrepare(t *testing.T) { t.Run("Permission hash == Agent Permission Hash no need to regenerate the key", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() + + apiKey := bulk.APIKey{ID: "test_id_existing", Key: "existing-key"} + hashPerm := "abc123" - po := PolicyOutput{ + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -101,29 +105,62 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - DefaultAPIKey: "test_id:EXISTING-KEY", - PolicyOutputPermissionsHash: hashPerm, + Outputs: map[string]*model.PolicyOutput{ + output.Name: { + ESDocument: model.ESDocument{}, + APIKey: apiKey.Agent(), + ToRetireAPIKeyIds: nil, + APIKeyID: apiKey.ID, + PermissionsHash: hashPerm, + Type: OutputTypeElasticsearch, + }, + }, } - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] - require.True(t, ok, "unable to case api key") - require.Equal(t, testAgent.DefaultAPIKey, key) - bulker.AssertNotCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) - bulker.AssertNotCalled(t, "APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + require.True(t, ok, "api key not present on policy map") + assert.Equal(t, apiKey.Agent(), key) + + assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) + assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + + bulker.AssertNotCalled(t, "Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) + bulker.AssertNotCalled(t, "APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) bulker.AssertExpectations(t) }) t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - bulker.On("APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&bulk.APIKey{"abc", "new-key"}, nil).Once() //nolint:govet // test case - po := PolicyOutput{ + oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} + wantAPIKey := bulk.APIKey{ID: "abc", Key: "new-key"} + hashPerm := "old-HASH" + + bulker.On("Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Once() + bulker.On("APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&wantAPIKey, nil).Once() + + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -137,27 +174,55 @@ func TestPolicyOutputESPrepare(t *testing.T) { } testAgent := &model.Agent{ - DefaultAPIKey: "test_id:EXISTING-KEY", - PolicyOutputPermissionsHash: "old-HASH", + Outputs: map[string]*model.PolicyOutput{ + output.Name: { + ESDocument: model.ESDocument{}, + APIKey: oldAPIKey.Agent(), + ToRetireAPIKeyIds: nil, + APIKeyID: oldAPIKey.ID, + PermissionsHash: hashPerm, + Type: OutputTypeElasticsearch, + }, + }, } - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") - require.Equal(t, "abc:new-key", key) + require.Equal(t, wantAPIKey.Agent(), key) + + assert.Equal(t, wantAPIKey.Agent(), gotOutput.APIKey) + assert.Equal(t, wantAPIKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + + // assert.Contains(t, gotOutput.ToRetireAPIKeyIds, oldAPIKey.ID) // TODO: assert on bulker.Update + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + bulker.AssertExpectations(t) }) t.Run("Generate API Key on new Agent", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() - bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - bulker.On("APIKeyCreate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&bulk.APIKey{"abc", "new-key"}, nil).Once() //nolint:govet // test case - - po := PolicyOutput{ + bulker.On("Update", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Once() + apiKey := bulk.APIKey{ID: "abc", Key: "new-key"} + bulker.On("APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&apiKey, nil).Once() + + output := Output{ Type: OutputTypeElasticsearch, Name: "test output", Role: &RoleT{ @@ -170,15 +235,29 @@ func TestPolicyOutputESPrepare(t *testing.T) { "test output": map[string]interface{}{}, } - testAgent := &model.Agent{} + testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}} - err := po.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap("test output")["api_key"].(string) + key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") - require.Equal(t, "abc:new-key", key) + assert.Equal(t, apiKey.Agent(), key) + + assert.Equal(t, apiKey.Agent(), gotOutput.APIKey) + assert.Equal(t, apiKey.ID, gotOutput.APIKeyID) + assert.Equal(t, output.Role.Sha2, gotOutput.PermissionsHash) + assert.Equal(t, output.Type, gotOutput.Type) + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + + // Old model must always remain empty + assert.Empty(t, testAgent.DefaultAPIKey) + assert.Empty(t, testAgent.DefaultAPIKeyID) + assert.Empty(t, testAgent.DefaultAPIKeyHistory) + assert.Empty(t, testAgent.PolicyOutputPermissionsHash) + bulker.AssertExpectations(t) }) } diff --git a/internal/pkg/testing/esutil/bootstrap.go b/internal/pkg/testing/esutil/bootstrap.go index e2aafce76..978f95a75 100644 --- a/internal/pkg/testing/esutil/bootstrap.go +++ b/internal/pkg/testing/esutil/bootstrap.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" ) -// EnsureIndex sets up the index if it doesn't exists, utilized for integration tests at the moment +// EnsureIndex sets up the index if it doesn't exist. It's utilized for integration tests at the moment. func EnsureIndex(ctx context.Context, cli *elasticsearch.Client, name, mapping string) error { err := EnsureTemplate(ctx, cli, name, mapping, false) if err != nil { diff --git a/internal/pkg/testing/setup.go b/internal/pkg/testing/setup.go index 8dac38cdc..8f38ba7e6 100644 --- a/internal/pkg/testing/setup.go +++ b/internal/pkg/testing/setup.go @@ -98,7 +98,7 @@ func SetupCleanIndex(ctx context.Context, t *testing.T, index string, opts ...bu func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string) string { t.Helper() - t.Helper() + tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Query().MatchAll() @@ -106,25 +106,25 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin query, err := q.Render(make(map[string]interface{})) if err != nil { - t.Fatal(err) + t.Fatalf("could not clean index: failed to render query template: %v", err) } cli := bulker.Client() + res, err := cli.API.DeleteByQuery([]string{index}, bytes.NewReader(query), cli.API.DeleteByQuery.WithContext(ctx), cli.API.DeleteByQuery.WithRefresh(true), ) - if err != nil { - t.Fatal(err) + t.Fatalf("could not clean index %s, DeleteByQuery failed: %v", + index, err) } defer res.Body.Close() var esres es.DeleteByQueryResponse - err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { - t.Fatal(err) + t.Fatalf("could not decode ES response: %v", err) } if res.IsError() { @@ -135,9 +135,9 @@ func CleanIndex(ctx context.Context, t *testing.T, bulker bulk.Bulk, index strin } } } - if err != nil { - t.Fatal(err) + t.Fatalf("ES returned an error: %v. body: %q", err, res) } + return index } diff --git a/model/schema.json b/model/schema.json index 91bda492f..423cfe3fc 100644 --- a/model/schema.json +++ b/model/schema.json @@ -244,6 +244,7 @@ "name" ] }, + "server-metadata": { "title": "Server Metadata", "description": "A Fleet Server metadata", @@ -264,6 +265,7 @@ "version" ] }, + "server": { "title": "Server", "description": "A Fleet Server", @@ -284,6 +286,7 @@ "server" ] }, + "policy": { "title": "Policy", "description": "A policy that an Elastic Agent is attached to", @@ -329,6 +332,7 @@ "default_fleet_server" ] }, + "policy-leader": { "title": "Policy Leader", "description": "The current leader Fleet Server for a policy", @@ -345,6 +349,60 @@ "server" ] }, + + "to_retire_api_key_ids": { + "type": "array", + "items": { + "description": "the Output API Keys that were replaced and should be retired", + "type": "object", + "properties": { + "id": { + "description": "API Key identifier", + "type": "string" + }, + "retired_at": { + "description": "Date/time the API key was retired", + "type": "string", + "format": "date-time" + } + } + } + }, + + "policy_output" : { + "type": "object", + "description": "holds the needed data to manage the output API keys", + "properties": { + "api_key": { + "description": "API key the Elastic Agent uses to authenticate with elasticsearch", + "type": "string" + }, + "to_retire_api_key_ids": { + "description": "API keys to be invalidated on next agent ack", + "$ref": "#/definitions/to_retire_api_key_ids" + }, + "api_key_id": { + "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", + "type": "string" + }, + "permissions_hash": { + "description": "The policy output permissions hash", + "type": "string" + }, + "type": { + "description": "Type is the output type. Currently only Elasticsearch is supported.", + "type": "string" + } + }, + "required": [ + "api_key", + "api_key_history", + "api_key_id", + "permissions_hash", + "type" + ] + }, + "agent": { "title": "Agent", "description": "An Elastic Agent that has enrolled into Fleet", @@ -441,7 +499,7 @@ "type": "integer" }, "policy_output_permissions_hash": { - "description": "The policy output permissions hash", + "description": "Deprecated. Use Outputs instead. The policy output permissions hash", "type": "string" }, "last_updated": { @@ -459,30 +517,21 @@ "type": "string" }, "default_api_key_id": { - "description": "ID of the API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key": { - "description": "API key the Elastic Agent uses to authenticate with elasticsearch", + "description": "Deprecated. Use Outputs instead. API key the Elastic Agent uses to authenticate with elasticsearch", "type": "string" }, "default_api_key_history": { - "description": "Default API Key History", - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "description": "API Key identifier", - "type": "string" - }, - "retired_at": { - "description": "Date/time the API key was retired", - "type": "string", - "format": "date-time" - } - } - } + "description": "Deprecated. Use Outputs instead. Default API Key History", + "$ref": "#/definitions/to_retire_api_key_ids" + }, + "outputs": { + "description": "Outputs is the policy output data, mapping the output name to its data", + "type": "object", + "additionalProperties": { "$ref": "#/definitions/policy_output"} }, "updated_at": { "description": "Date/time the Elastic Agent was last updated", @@ -512,6 +561,7 @@ "status" ] }, + "enrollment_api_key": { "title": "Enrollment API key", "description": "An Elastic Agent enrollment API key", @@ -555,6 +605,7 @@ ] } }, + "checkin": { "title": "Checkin", "description": "An Elastic Agent checkin to Fleet", From d5bde6f304eb217ae5445da23ec5651f0977c05b Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 19 Sep 2022 21:35:07 +0200 Subject: [PATCH 2/5] avoid new API key being marked for invalidation --- internal/pkg/dl/constants.go | 1 + internal/pkg/policy/policy_output.go | 22 ++++-- .../policy/policy_output_integration_test.go | 73 +++++++++++++++++++ 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 034c23541..419f92874 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -43,6 +43,7 @@ const ( FieldPolicyRevisionIdx = "policy_revision_idx" FieldRevisionIdx = "revision_idx" FieldUnenrolledReason = "unenrolled_reason" + FiledType = "type" FieldActive = "active" FieldUpdatedAt = "updated_at" diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index c2728aa1e..fefd192d3 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -74,8 +74,8 @@ func (p *Output) prepareElasticsearch( return ErrNoOutputPerms } - output, ok := agent.Outputs[p.Name] - if !ok { + output, foundOutput := agent.Outputs[p.Name] + if !foundOutput { if agent.Outputs == nil { agent.Outputs = map[string]*model.PolicyOutput{} } @@ -120,11 +120,6 @@ func (p *Output) prepareElasticsearch( return fmt.Errorf("failed generate output API key: %w", err) } - output.Type = OutputTypeElasticsearch - output.APIKey = outputAPIKey.Agent() - output.APIKeyID = outputAPIKey.ID - output.PermissionsHash = p.Role.Sha2 // for the sake of consistency - // When a new keys is generated we need to update the Agent record, // this will need to be updated when multiples remote Elasticsearch output // are supported. @@ -138,6 +133,10 @@ func (p *Output) prepareElasticsearch( dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, } + + if !foundOutput { + fields[dl.FiledType] = OutputTypeElasticsearch + } if output.APIKeyID != "" { fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ ID: output.APIKeyID, @@ -155,6 +154,15 @@ func (p *Output) prepareElasticsearch( zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) } + + // Now that all is done, we can update the output on the agent variable + // Right not it's more for consistency and to ensure the in-memory agent + // data is correct and in sync with ES, so it can be safely used after + // this method returns. + output.Type = OutputTypeElasticsearch + output.APIKey = outputAPIKey.Agent() + output.APIKeyID = outputAPIKey.ID + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency } // Always insert the `api_key` as part of the output block, this is required diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go index 6acd0d9fa..5c8a254b8 100644 --- a/internal/pkg/policy/policy_output_integration_test.go +++ b/internal/pkg/policy/policy_output_integration_test.go @@ -13,12 +13,14 @@ import ( "time" "github.com/gofrs/uuid" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" ) @@ -125,3 +127,74 @@ func TestRenderUpdatePainlessScript(t *testing.T) { }) } } + +func TestPolicyOutputESPrepareRealES(t *testing.T) { + index, bulker := ftesting.SetupCleanIndex(context.Background(), t, dl.FleetAgents) + + agentID := createAgent(t, index, bulker) + agent, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + if err != nil { + require.NoError(t, err, "failed to find agent ID %q", agentID) + } + + output := Output{ + Type: OutputTypeElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "new-hash", + Raw: TestPayload, + }, + } + policyMap := smap.Map{ + "test output": map[string]interface{}{}, + } + + err = output.prepareElasticsearch( + context.Background(), zerolog.Nop(), bulker, &agent, policyMap) + require.NoError(t, err) + + // need to wait a bit before querying the agent again + // TODO: find a better way to query the updated agent + time.Sleep(time.Second) + + got, err := dl.FindAgent( + context.Background(), bulker, dl.QueryAgentByID, dl.FieldID, agentID, dl.WithIndexName(index)) + if err != nil { + require.NoError(t, err, "failed to find agent ID %q", agentID) + } + + gotOutput, ok := got.Outputs[output.Name] + require.True(t, ok, "no '%s' output fouled on agent document", output.Name) + + assert.Empty(t, gotOutput.ToRetireAPIKeyIds) + assert.Equal(t, gotOutput.Type, OutputTypeElasticsearch) + assert.Equal(t, gotOutput.PermissionsHash, output.Role.Sha2) + assert.NotEmpty(t, gotOutput.APIKey) + assert.NotEmpty(t, gotOutput.APIKeyID) +} + +func createAgent(t *testing.T, index string, bulker bulk.Bulk) string { + const nowStr = "2022-08-12T16:50:05Z" + + agentID := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() + + agentModel := model.Agent{ + PolicyID: policyID, + Active: true, + LastCheckin: nowStr, + LastCheckinStatus: "", + UpdatedAt: nowStr, + EnrolledAt: nowStr, + } + + body, err := json.Marshal(agentModel) + require.NoError(t, err) + + _, err = bulker.Create( + context.Background(), index, agentID, body, bulk.WithRefresh()) + require.NoError(t, err) + + return agentID +} From c14692388ae35500489237d936a5244d446b9dba Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 20 Sep 2022 12:29:10 +0200 Subject: [PATCH 3/5] Suggested/conflicts 2 (#2) * merge conflicts * conflict * conflicts * lint * skip empty keys * wee changes and fix Co-authored-by: Michal Pristas --- dev-tools/integration/.env | 2 +- internal/pkg/api/handleAck.go | 103 ++++++++- internal/pkg/api/handleAck_test.go | 40 ++-- internal/pkg/api/handleEnroll.go | 2 +- internal/pkg/apikey/apikey.go | 36 ++-- .../pkg/apikey/apikey_integration_test.go | 61 +++++- internal/pkg/bulk/block.go | 2 + internal/pkg/bulk/engine.go | 8 +- internal/pkg/bulk/opApiKey.go | 200 +++++++++++++++++- internal/pkg/bulk/opt.go | 13 ++ internal/pkg/bulk/queue.go | 3 + internal/pkg/config/config_test.go | 13 +- internal/pkg/config/output.go | 28 +-- internal/pkg/es/bulk_update_api_key.go | 109 ++++++++++ internal/pkg/policy/policy_output.go | 170 ++++++++++++++- internal/pkg/policy/policy_output_test.go | 12 +- internal/pkg/testing/bulk.go | 7 +- 17 files changed, 733 insertions(+), 76 deletions(-) create mode 100644 internal/pkg/es/bulk_update_api_key.go diff --git a/dev-tools/integration/.env b/dev-tools/integration/.env index 8d460e721..b4bb4fe9f 100644 --- a/dev-tools/integration/.env +++ b/dev-tools/integration/.env @@ -1,4 +1,4 @@ -ELASTICSEARCH_VERSION=8.5.0-c7913db3-SNAPSHOT +ELASTICSEARCH_VERSION=8.5.0-56d2c52d-SNAPSHOT ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=changeme TEST_ELASTICSEARCH_HOSTS=localhost:9200 \ No newline at end of file diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 1261d8f6c..2a502f70c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog" @@ -351,10 +352,66 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - ack.invalidateAPIKeys(ctx, agent) + for _, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + + err := ack.updateAPIKey(ctx, + zlog, + agent.Id, + currRev, currCoord, + agent.PolicyID, + output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) + if err != nil { + return err + } + } + + return nil + +} + +func (ack *AckT) updateAPIKey(ctx context.Context, + zlog zerolog.Logger, + agentID string, + currRev, currCoord int64, + policyID, apiKeyID, permissionHash string, + toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { + + if apiKeyID != "" { + res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) + if err != nil { + zlog.Error(). + Err(err). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to read API Key roles") + } else { + clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) + if err != nil { + zlog.Error(). + Err(err). + RawJSON("roles", res.RoleDescriptors). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to cleanup roles") + } else if removedRolesCount > 0 { + if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil { + zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key") + } else { + zlog.Debug(). + Str("hash.sha256", permissionHash). + Str(LogAPIKeyID, apiKeyID). + RawJSON("roles", clean). + Int("removedRoles", removedRolesCount). + Msg("Updating agent record to pick up reduced roles.") + } + } + } + ack.invalidateAPIKeys(ctx, toRetireAPIKeyIDs, apiKeyID) + } body := makeUpdatePolicyBody( - agent.PolicyID, + policyID, currRev, currCoord, ) @@ -362,14 +419,14 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag err := ack.bulk.Update( ctx, dl.FleetAgents, - agent.Id, + agentID, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3), ) - zlog.Info().Err(err). - Str(LogPolicyID, agent.PolicyID). + zlog.Err(err). + Str(LogPolicyID, policyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). Msg("ack policy") @@ -377,12 +434,38 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return errors.Wrap(err, "handlePolicyChange update") } -func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { - var ids []string - for _, out := range agent.Outputs { - for _, k := range out.ToRetireAPIKeyIds { - ids = append(ids, k.ID) +func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) { + rr := smap.Map{} + if err := json.Unmarshal(roles, &rr); err != nil { + return nil, 0, errors.Wrap(err, "failed to unmarshal provided roles") + } + + keys := make([]string, 0, len(rr)) + for k := range rr { + if strings.HasSuffix(k, "-rdstale") { + keys = append(keys, k) + } + } + + if len(keys) == 0 { + return roles, 0, nil + } + + for _, k := range keys { + delete(rr, k) + } + + r, err := json.Marshal(rr) + return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition") +} + +func (ack *AckT) invalidateAPIKeys(ctx context.Context, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) { + ids := make([]string, 0, len(toRetireAPIKeyIDs)) + for _, k := range toRetireAPIKeyIDs { + if k.ID == skip || k.ID == "" { + continue } + ids = append(ids, k.ID) } if len(ids) > 0 { diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 60a265bd4..29c678c24 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -452,7 +452,16 @@ func TestInvalidateAPIKeys(t *testing.T) { }} var toRetire3 []model.ToRetireAPIKeyIdsItems - want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"} + skips := map[string]string{ + "1": "toRetire1", + "2": "toRetire2_0", + "3": "", + } + wants := map[string][]string{ + "1": {}, + "2": {"toRetire2_1"}, + "3": {}, + } agent := model.Agent{ Outputs: map[string]*model.PolicyOutput{ @@ -462,17 +471,24 @@ func TestInvalidateAPIKeys(t *testing.T) { }, } - bulker := ftesting.NewMockBulk() - bulker.On("APIKeyInvalidate", - context.Background(), mock.MatchedBy(func(ids []string) bool { - // if A contains B and B contains A => A = B - return assert.Subset(t, ids, want) && - assert.Subset(t, want, ids) - })). - Return(nil) + for i, out := range agent.Outputs { + skip := skips[i] + want := wants[i] + + bulker := ftesting.NewMockBulk() + if len(want) > 0 { + bulker.On("APIKeyInvalidate", + context.Background(), mock.MatchedBy(func(ids []string) bool { + // if A contains B and B contains A => A = B + return assert.Subset(t, ids, want) && + assert.Subset(t, want, ids) + })). + Return(nil) + } - ack := &AckT{bulk: bulker} - ack.invalidateAPIKeys(context.Background(), &agent) + ack := &AckT{bulk: bulker} + ack.invalidateAPIKeys(context.Background(), out.ToRetireAPIKeyIds, skip) - bulker.AssertExpectations(t) + bulker.AssertExpectations(t) + } } diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index 7c5b1dd5a..9123723d6 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -298,7 +298,7 @@ func invalidateAPIKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk LOOP: for { - _, err := bulker.APIKeyRead(ctx, apikeyID) + _, err := bulker.APIKeyRead(ctx, apikeyID, false) switch { case err == nil: diff --git a/internal/pkg/apikey/apikey.go b/internal/pkg/apikey/apikey.go index 4134f2b0d..05551f272 100644 --- a/internal/pkg/apikey/apikey.go +++ b/internal/pkg/apikey/apikey.go @@ -6,11 +6,9 @@ package apikey import ( - "bytes" "context" "encoding/base64" "encoding/json" - "errors" "fmt" "net/http" "strings" @@ -18,6 +16,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/pkg/errors" ) const ( @@ -36,20 +35,26 @@ var AuthKey = http.CanonicalHeaderKey("Authorization") // APIKeyMetadata tracks Metadata associated with an APIKey. type APIKeyMetadata struct { - ID string - Metadata Metadata + ID string + Metadata Metadata + RoleDescriptors json.RawMessage } // Read gathers APIKeyMetadata from Elasticsearch using the given client. -func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { +func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) { + opts := []func(*esapi.SecurityGetAPIKeyRequest){ client.Security.GetAPIKey.WithContext(ctx), client.Security.GetAPIKey.WithID(id), } + if withOwner { + opts = append(opts, client.Security.GetAPIKey.WithOwner(true)) + } res, err := client.Security.GetAPIKey( opts..., ) + if err != nil { return nil, fmt.Errorf("request to elasticsearch failed: %w", err) } @@ -60,22 +65,19 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey } type APIKeyResponse struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata"` + ID string `json:"id"` + Metadata Metadata `json:"metadata"` + RoleDescriptors json.RawMessage `json:"role_descriptors"` } type GetAPIKeyResponse struct { APIKeys []APIKeyResponse `json:"api_keys"` } - var buff bytes.Buffer - if _, err := buff.ReadFrom(res.Body); err != nil { - return nil, fmt.Errorf("could not read from response body: %w", err) - } - var resp GetAPIKeyResponse - if err = json.Unmarshal(buff.Bytes(), &resp); err != nil { + d := json.NewDecoder(res.Body) + if err = d.Decode(&resp); err != nil { return nil, fmt.Errorf( - "could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err) + "could not decode elasticsearch GetAPIKeyResponse: %w", err) } if len(resp.APIKeys) == 0 { @@ -83,9 +85,11 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey } first := resp.APIKeys[0] + return &APIKeyMetadata{ - ID: first.ID, - Metadata: first.Metadata, + ID: first.ID, + Metadata: first.Metadata, + RoleDescriptors: first.RoleDescriptors, }, nil } diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 72f410d99..ce4529254 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -30,7 +30,58 @@ const testFleetRoles = ` } ` -func TestRead(t *testing.T) { +func TestRead_existingKey(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + cfg := elasticsearch.Config{ + Username: "elastic", + Password: "changeme", + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatal(err) + } + + // Create the key + agentID := uuid.Must(uuid.NewV4()).String() + name := uuid.Must(uuid.NewV4()).String() + akey, err := Create(ctx, es, name, "", "true", []byte(testFleetRoles), + NewMetadata(agentID, "", TypeAccess)) + if err != nil { + t.Fatal(err) + } + + // Get the key and verify that metadata was saved correctly + aKeyMeta, err := Read(ctx, es, akey.ID, false) + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) + if diff != "" { + t.Error(diff) + } + +} + +func TestRead_noKey(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -45,12 +96,12 @@ func TestRead(t *testing.T) { } // Try to get the key that doesn't exist, expect ErrApiKeyNotFound - _, err = Read(ctx, es, "0000000000000") + _, err = Read(ctx, es, "0000000000000", false) if !errors.Is(err, ErrAPIKeyNotFound) { - t.Errorf("Unexpected error type: %v", err) + t.Errorf("Unexpected error: %v", err) } - } + func TestCreateAPIKeyWithMetadata(t *testing.T) { tts := []struct { name string @@ -92,7 +143,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { } // Get the API key and verify that the metadata was saved correctly - aKeyMeta, err := Read(ctx, es, apiKey.ID) + aKeyMeta, err := Read(ctx, es, apiKey.ID, false) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/bulk/block.go b/internal/pkg/bulk/block.go index 28c80927e..c2535172e 100644 --- a/internal/pkg/bulk/block.go +++ b/internal/pkg/bulk/block.go @@ -43,6 +43,7 @@ const ( ActionDelete ActionIndex ActionUpdate + ActionUpdateAPIKey ActionRead ActionSearch ActionFleetSearch @@ -53,6 +54,7 @@ var actionStrings = []string{ "delete", "index", "update", + "update_api_key", "read", "search", "fleet_search", diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 93420c780..68840729c 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -55,9 +55,10 @@ type Bulk interface { // APIKey operations APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error) - APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) + APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) APIKeyInvalidate(ctx context.Context, ids ...string) error + APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error // Accessor used to talk to elastic search direcly bypassing bulk engine Client() *elasticsearch.Client @@ -81,6 +82,7 @@ const ( defaultMaxPending = 32 defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast defaultAPIKeyMaxParallel = 32 + defaultApikeyMaxReqSize = 100 * 1024 * 1024 ) func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker { @@ -136,6 +138,8 @@ func blkToQueueType(blk *bulkT) queueType { } else { queueIdx = kQueueRead } + case ActionUpdateAPIKey: + queueIdx = kQueueAPIKeyUpdate default: if forceRefresh { queueIdx = kQueueRefreshBulk @@ -288,6 +292,8 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu err = b.flushRead(ctx, queue) case kQueueSearch, kQueueFleetSearch: err = b.flushSearch(ctx, queue) + case kQueueAPIKeyUpdate: + err = b.flushUpdateAPIKey(ctx, queue) default: err = b.flushBulk(ctx, queue) } diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 049c0ce17..099a7d291 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -5,15 +5,36 @@ package bulk import ( + "bytes" "context" + "encoding/json" + "math" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/rs/zerolog/log" +) + +const ( + envelopeSize = 64 // 64B + safeBuffer = 0.9 ) // The ApiKey API's are not yet bulk enabled. Stub the calls in the bulker // and limit parallel access to prevent many requests from overloading // the connection pool in the elastic search client. +type apiKeyUpdateRequest struct { + ID string `json:"id,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` + RolesHash string `json:"role_hash,omitempty"` +} + +type esAPIKeyBulkUpdateRequest struct { + IDs []string `json:"ids,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` +} + func (b *Bulker) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err @@ -32,13 +53,13 @@ func (b *Bulker) APIKeyCreate(ctx context.Context, name, ttl string, roles []byt return apikey.Create(ctx, b.Client(), name, ttl, "false", roles, meta) } -func (b *Bulker) APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) { +func (b *Bulker) APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) - return apikey.Read(ctx, b.Client(), id) + return apikey.Read(ctx, b.Client(), id, withOwner) } func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { @@ -49,3 +70,178 @@ func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { return apikey.Invalidate(ctx, b.Client(), ids...) } + +func (b *Bulker) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + req := &apiKeyUpdateRequest{ + ID: id, + Roles: roles, + RolesHash: outputPolicyHash, + } + + body, err := json.Marshal(req) + if err != nil { + return err + } + + _, err = b.waitBulkAction(ctx, ActionUpdateAPIKey, "", id, body) + return err +} + +// flushUpdateAPIKey takes an update API Key queue and groups request based on roles applied +// It needs to group agent IDs per Role Hash in order to produce more efficient request containing a list of IDs for a change(update) +// One thing to have in mind is that in a single queue there may be change and ack request with roles. in this case +// Later occurrence wins overwriting policy change to reduced set of permissions. +// Even if the order was incorrect we end up with just a bit broader permission set, never too strict, so agent does not +// end up with fewer permissions than it needs +func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { + idsPerRole := make(map[string][]string) + roles := make(map[string]json.RawMessage) + rolePerID := make(map[string]string) + responses := make(map[int]int) + idxToID := make(map[int32]string) + IDToResponse := make(map[string]int) + maxKeySize := 0 + + // merge ids + for n := queue.head; n != nil; n = n.next { + content := n.buf.Bytes() + metaMap := make(map[string]interface{}) + dec := json.NewDecoder(bytes.NewReader(content)) + if err := dec.Decode(&metaMap); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Msg("Failed to unmarshal api key update meta map") + return err + } + + var req *apiKeyUpdateRequest + if err := dec.Decode(&req); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Str("request", string(content)). + Msg("Failed to unmarshal api key update request") + return err + } + + if _, tracked := roles[req.RolesHash]; !tracked { + roles[req.RolesHash] = req.Roles + } + + // last one wins, it may be policy change and ack are in the same queue + rolePerID[req.ID] = req.RolesHash + idxToID[n.idx] = req.ID + if maxKeySize < len(req.ID) { + maxKeySize = len(req.ID) + } + } + + for id, roleHash := range rolePerID { + delete(rolePerID, id) + idsPerRole[roleHash] = append(idsPerRole[roleHash], id) + + } + + responseIdx := 0 + for hash, role := range roles { + idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize) + ids := idsPerRole[hash] + if idsPerBatch <= 0 { + log.Error().Str("err", "request too large").Msg("No API Key ID could fit request size for bulk update") + log.Debug(). + RawJSON("role", role). + Strs("ids", ids). + Msg("IDs could not fit into a message") + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + continue + } + + batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch))) + + // batch ids into batches of meaningful size + for batch := 0; batch < batches; batch++ { + // guard against indexing out of range + to := (batch + 1) * idsPerBatch + if to > len(ids) { + to = len(ids) + } + + // handle ids in batch, we put them into single request + // and assign response index to the id so we can notify caller + idsInBatch := ids[batch*idsPerBatch : to] + bulkReq := &esAPIKeyBulkUpdateRequest{ + IDs: idsInBatch, + Roles: role, + } + delete(roles, hash) + + payload, err := json.Marshal(bulkReq) + if err != nil { + return err + } + + req := &es.UpdateApiKeyBulkRequest{ + Body: bytes.NewReader(payload), + } + + res, err := req.Do(ctx, b.es) + if err != nil { + log.Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") + return err + } + if res.Body != nil { + defer res.Body.Close() + } + if res.IsError() { + log.Error().Str("err", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") + return parseError(res) + } + + log.Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") + + responses[responseIdx] = res.StatusCode + for _, id := range idsInBatch { + IDToResponse[id] = responseIdx + } + responseIdx++ + } + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + } + + // WARNING: Once we start pushing items to + // the queue, the node pointers are invalid. + // Do NOT return a non-nil value or failQueue + // up the stack will fail. + + for n := queue.head; n != nil; n = n.next { + // 'n' is invalid immediately on channel send + responseIdx := IDToResponse[idxToID[n.idx]] + res := responses[responseIdx] + select { + case n.ch <- respT{ + err: nil, + idx: n.idx, + data: &BulkIndexerResponseItem{ + DocumentID: "", + Status: res, + }, + }: + default: + panic("Unexpected blocked response channel on flushRead") + } + } + return nil +} + +func (b *Bulker) getIDsCountPerBatch(roleSize, maxKeySize int) int { + spareSpace := b.opts.apikeyMaxReqSize - roleSize - envelopeSize + if spareSpace > maxKeySize { + return int(float64(spareSpace) * safeBuffer / float64(maxKeySize)) + } + return 0 +} diff --git a/internal/pkg/bulk/opt.go b/internal/pkg/bulk/opt.go index e0701823e..6eeb2fe21 100644 --- a/internal/pkg/bulk/opt.go +++ b/internal/pkg/bulk/opt.go @@ -62,6 +62,7 @@ type bulkOptT struct { maxPending int blockQueueSz int apikeyMaxParallel int + apikeyMaxReqSize int } type BulkOpt func(*bulkOptT) @@ -108,6 +109,15 @@ func WithAPIKeyMaxParallel(max int) BulkOpt { } } +// WithAPIKeyMaxRequestSize sets the maximum size of the request body. Default 100MB +func WithAPIKeyMaxRequestSize(maxBytes int) BulkOpt { + return func(opt *bulkOptT) { + if opt.apikeyMaxReqSize > 0 { + opt.apikeyMaxReqSize = maxBytes + } + } +} + func parseBulkOpts(opts ...BulkOpt) bulkOptT { bopt := bulkOptT{ flushInterval: defaultFlushInterval, @@ -116,6 +126,7 @@ func parseBulkOpts(opts ...BulkOpt) bulkOptT { maxPending: defaultMaxPending, apikeyMaxParallel: defaultAPIKeyMaxParallel, blockQueueSz: defaultBlockQueueSz, + apikeyMaxReqSize: defaultApikeyMaxReqSize, } for _, f := range opts { @@ -132,6 +143,7 @@ func (o *bulkOptT) MarshalZerologObject(e *zerolog.Event) { e.Int("maxPending", o.maxPending) e.Int("blockQueueSz", o.blockQueueSz) e.Int("apikeyMaxParallel", o.apikeyMaxParallel) + e.Int("apikeyMaxReqSize", o.apikeyMaxReqSize) } // BulkOptsFromCfg transforms config to a slize of BulkOpt @@ -152,5 +164,6 @@ func BulkOptsFromCfg(cfg *config.Config) []BulkOpt { WithFlushThresholdSize(bulkCfg.FlushThresholdSize), WithMaxPending(bulkCfg.FlushMaxPending), WithAPIKeyMaxParallel(maxKeyParallel), + WithAPIKeyMaxRequestSize(cfg.Output.Elasticsearch.MaxContentLength), } } diff --git a/internal/pkg/bulk/queue.go b/internal/pkg/bulk/queue.go index dc7bde5d1..f2060212a 100644 --- a/internal/pkg/bulk/queue.go +++ b/internal/pkg/bulk/queue.go @@ -20,6 +20,7 @@ const ( kQueueFleetSearch kQueueRefreshBulk kQueueRefreshRead + kQueueAPIKeyUpdate kNumQueues ) @@ -37,6 +38,8 @@ func (q queueT) Type() string { return "refreshBulk" case kQueueRefreshRead: return "refreshRead" + case kQueueAPIKeyUpdate: + return "apiKeyUpdate" } panic("unknown") } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index 4e3d7ea61..110f93783 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -237,12 +237,13 @@ func defaultFleet() Fleet { func defaultElastic() Elasticsearch { return Elasticsearch{ - Protocol: "http", - ServiceToken: "test-token", - Hosts: []string{"localhost:9200"}, - MaxRetries: 3, - MaxConnPerHost: 128, - Timeout: 90 * time.Second, + Protocol: "http", + ServiceToken: "test-token", + Hosts: []string{"localhost:9200"}, + MaxRetries: 3, + MaxConnPerHost: 128, + MaxContentLength: 104857600, + Timeout: 90 * time.Second, } } diff --git a/internal/pkg/config/output.go b/internal/pkg/config/output.go index 5804d5858..8e8751d45 100644 --- a/internal/pkg/config/output.go +++ b/internal/pkg/config/output.go @@ -28,19 +28,20 @@ var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`) // Elasticsearch is the configuration for elasticsearch. type Elasticsearch struct { - Protocol string `config:"protocol"` - Hosts []string `config:"hosts"` - Path string `config:"path"` - Headers map[string]string `config:"headers"` - APIKey string `config:"api_key"` - ServiceToken string `config:"service_token"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` - ProxyHeaders map[string]string `config:"proxy_headers"` - TLS *tlscommon.Config `config:"ssl"` - MaxRetries int `config:"max_retries"` - MaxConnPerHost int `config:"max_conn_per_host"` - Timeout time.Duration `config:"timeout"` + Protocol string `config:"protocol"` + Hosts []string `config:"hosts"` + Path string `config:"path"` + Headers map[string]string `config:"headers"` + APIKey string `config:"api_key"` + ServiceToken string `config:"service_token"` + ProxyURL string `config:"proxy_url"` + ProxyDisable bool `config:"proxy_disable"` + ProxyHeaders map[string]string `config:"proxy_headers"` + TLS *tlscommon.Config `config:"ssl"` + MaxRetries int `config:"max_retries"` + MaxConnPerHost int `config:"max_conn_per_host"` + Timeout time.Duration `config:"timeout"` + MaxContentLength int `config:"max_content_length"` } // InitDefaults initializes the defaults for the configuration. @@ -50,6 +51,7 @@ func (c *Elasticsearch) InitDefaults() { c.Timeout = 90 * time.Second c.MaxRetries = 3 c.MaxConnPerHost = 128 + c.MaxContentLength = 100 * 1024 * 1024 } // Validate ensures that the configuration is valid. diff --git a/internal/pkg/es/bulk_update_api_key.go b/internal/pkg/es/bulk_update_api_key.go new file mode 100644 index 000000000..e75df2996 --- /dev/null +++ b/internal/pkg/es/bulk_update_api_key.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated from specification version 7.x: DO NOT EDIT + +// This is a copy of api.search.go file from go-elasticsearch library +// It was modified for /_fleet/_fleet_search experimental API, +// implemented by the custom fleet plugin https://github.com/elastic/elasticsearch/pull/73134 +// This file can be removed and replaced with the official client library wrapper once it is available + +package es + +import ( + "context" + "io" + "net/http" + "strings" + + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +const updateAPIKeyPath = "/_security/api_key/_bulk_update" + +type UpdateApiKeyBulk func(o ...func(*UpdateApiKeyBulkRequest)) (*Response, error) + +type UpdateApiKeyBulkRequest struct { + Body io.Reader + + Header http.Header + + ctx context.Context +} + +// Do executes the request and returns response or error. +// +func (r UpdateApiKeyBulkRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) { + var path strings.Builder + + path.Grow(len(updateAPIKeyPath)) + path.WriteString(updateAPIKeyPath) + + req, err := newRequest(http.MethodPost, path.String(), r.Body) + if err != nil { + return nil, err + } + + if r.Body != nil { + req.Header[headerContentType] = headerContentTypeJSON + } + + if len(r.Header) > 0 { + if len(req.Header) == 0 { + req.Header = r.Header + } else { + for k, vv := range r.Header { + for _, v := range vv { + req.Header.Add(k, v) + } + } + } + } + + if ctx != nil { + req = req.WithContext(ctx) + } + + res, err := transport.Perform(req) + if err != nil { + return nil, err + } + + response := esapi.Response{ + StatusCode: res.StatusCode, + Body: res.Body, + Header: res.Header, + } + + return &response, nil +} + +// WithContext sets the request context. +// +func (f UpdateApiKeyBulkRequest) WithContext(v context.Context) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.ctx = v + } +} + +// WithBody - The search definition using the Query DSL. +// +func (f UpdateApiKeyBulkRequest) WithBody(v io.Reader) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.Body = v + } +} + +// WithHeader adds the headers to the HTTP request. +// +func (f UpdateApiKeyBulkRequest) WithHeader(h map[string]string) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + if r.Header == nil { + r.Header = make(http.Header) + } + for k, v := range h { + r.Header.Add(k, v) + } + } +} diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index fefd192d3..55683cf51 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -91,22 +91,78 @@ func (p *Output) prepareElasticsearch( // Note: This will need to be updated when doing multi-cluster elasticsearch support // Currently, we assume all ES outputs are the same ES fleet-server is connected to. - needNewKey := true + needNewKey := false + needUpdateKey := false switch { case output.APIKey == "": zlog.Debug().Msg("must generate api key as default API key is not present") + needNewKey = true case p.Role.Sha2 != output.PermissionsHash: // the is actually the OutputPermissionsHash for the default hash. The Agent // document on ES does not have OutputPermissionsHash for any other output // besides the default one. It seems to me error-prone to rely on the default // output permissions hash to generate new API keys for other outputs. zlog.Debug().Msg("must generate api key as policy output permissions changed") + needUpdateKey = true default: - needNewKey = false zlog.Debug().Msg("policy output permissions are the same") } - if needNewKey { + if needUpdateKey { + zlog.Debug(). + RawJSON("roles", p.Role.Raw). + Str("oldHash", output.PermissionsHash). + Str("newHash", p.Role.Sha2). + Msg("Generating a new API key") + + // query current api key for roles so we don't lose permissions in the meantime + currentRoles, err := fetchAPIKeyRoles(ctx, bulker, output.APIKeyID) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail fetching roles for key") + return err + } + + // merge roles with p.Role + newRoles, err := mergeRoles(zlog, currentRoles, p.Role) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail merging roles for key") + return err + } + + // hash provided is only for merging request together and not persisted + err = bulker.APIKeyUpdate(ctx, output.APIKeyID, newRoles.Sha2, newRoles.Raw) + if err != nil { + zlog.Error().Err(err).Msg("fail generate output key") + zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") + return err + } + + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + zlog.Debug(). + Str("hash.sha256", p.Role.Sha2). + Str("roles", string(p.Role.Raw)). + Msg("Updating agent record to pick up most recent roles.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + + // Using painless script to update permission hash for updated key + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return err + } + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } + + } else if needNewKey { zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). @@ -181,6 +237,114 @@ func (p *Output) prepareElasticsearch( return nil } +func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) { + res, err := b.APIKeyRead(ctx, apiKeyID, true) + if err != nil { + return nil, err + } + + roleMap, err := smap.Parse(res.RoleDescriptors) + if err != nil { + return nil, err + } + r := &RoleT{ + Raw: res.RoleDescriptors, + } + + // Stable hash on permissions payload + if r.Sha2, err = roleMap.Hash(); err != nil { + return nil, err + } + + return r, nil +} + +// mergeRoles takes old and new role sets and merges them following these rules: +// - take all new roles +// - append all old roles +// to avoid name collisions every old entry has a `rdstale` suffix +// if rdstale suffix already exists it uses `{index}-rdstale` to avoid further collisions +// everything ending with `rdstale` is removed on ack. +// in case we have key `123` in both old and new result will be: {"123", "123-0-rdstale"} +// in case old contains {"123", "123-0-rdstale"} and new contains {"123"} result is: {"123", "123-rdstale", "123-0-rdstale"} +func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { + if old == nil { + return new, nil + } + if new == nil { + return old, nil + } + + oldMap, err := smap.Parse(old.Raw) + if err != nil { + return nil, err + } + if oldMap == nil { + return new, nil + } + + newMap, err := smap.Parse(new.Raw) + if err != nil { + return nil, err + } + if newMap == nil { + return old, nil + } + + destMap := smap.Map{} + // copy all from new + for k, v := range newMap { + destMap[k] = v + } + + findNewKey := func(m smap.Map, candidate string) string { + if strings.HasSuffix(candidate, "-rdstale") { + candidate = strings.TrimSuffix(candidate, "-rdstale") + dashIdx := strings.LastIndex(candidate, "-") + if dashIdx >= 0 { + candidate = candidate[:dashIdx] + } + + } + + // 1 should be enough, 100 is just to have some space + for i := 0; i < 100; i++ { + c := fmt.Sprintf("%s-%d-rdstale", candidate, i) + + if _, exists := m[c]; !exists { + return c + } + } + + return "" + } + // copy old + for k, v := range oldMap { + newKey := findNewKey(destMap, k) + if newKey == "" { + zlog.Warn().Msg("Failed to find a key for role assignement.") + + zlog.Debug(). + RawJSON("roles", new.Raw). + Str("candidate", k). + Msg("roles not included.") + + continue + } + destMap[newKey] = v + } + + r := &RoleT{} + if r.Sha2, err = destMap.Hash(); err != nil { + return nil, err + } + if r.Raw, err = json.Marshal(destMap); err != nil { + return nil, err + } + + return r, nil +} + func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d66275d04..f74d57b3e 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -145,20 +145,22 @@ func TestPolicyOutputESPrepare(t *testing.T) { bulker.AssertExpectations(t) }) - t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) { + t.Run("Permission hash != Agent Permission Hash need to regenerate permissions", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} - wantAPIKey := bulk.APIKey{ID: "abc", Key: "new-key"} + wantAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} hashPerm := "old-HASH" + bulker. + On("APIKeyRead", mock.Anything, mock.Anything, mock.Anything). + Return(&bulk.APIKeyMetadata{ID: "test_id", RoleDescriptors: TestPayload}, nil). + Once() bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil).Once() - bulker.On("APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&wantAPIKey, nil).Once() + bulker.On("APIKeyUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() output := Output{ Type: OutputTypeElasticsearch, diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index 1123232b7..724d54086 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -83,7 +83,7 @@ func (m *MockBulk) APIKeyCreate(ctx context.Context, name, ttl string, roles []b return args.Get(0).(*bulk.APIKey), args.Error(1) } -func (m *MockBulk) APIKeyRead(ctx context.Context, id string) (*bulk.APIKeyMetadata, error) { +func (m *MockBulk) APIKeyRead(ctx context.Context, id string, _ bool) (*bulk.APIKeyMetadata, error) { args := m.Called(ctx, id) return args.Get(0).(*bulk.APIKeyMetadata), args.Error(1) } @@ -98,4 +98,9 @@ func (m *MockBulk) APIKeyInvalidate(ctx context.Context, ids ...string) error { return args.Error(0) } +func (m *MockBulk) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + args := m.Called(ctx, id) + return args.Error(0) +} + var _ bulk.Bulk = (*MockBulk)(nil) From 191057d05f9fe17093f05a2b12bccfc92bbfef7e Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 20 Sep 2022 14:21:15 +0200 Subject: [PATCH 4/5] fix imports --- internal/pkg/api/handleAck.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 543b8d317..c69a65b2c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -15,13 +15,14 @@ import ( "strings" "time" + "github.com/julienschmidt/httprouter" "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" - - - "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/limit" @@ -29,10 +30,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" "github.com/elastic/fleet-server/v7/internal/pkg/smap" - - "github.com/julienschmidt/httprouter" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) type HTTPError struct { From e7756bd3f3f7d532a5ade9a01526d10418a5f3d0 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 20 Sep 2022 14:26:20 +0200 Subject: [PATCH 5/5] fix lint issue --- internal/pkg/dl/migration_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index 183502b94..55a2e61d7 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -204,7 +204,7 @@ func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { } func TestMigrateOutputs_nil_DefaultAPIKeyHistory(t *testing.T) { - wantOutputType := "elasticsearch" //nolint:goconst // test cases have some duplication + wantOutputType := "elasticsearch" now, err := time.Parse(time.RFC3339, nowStr) require.NoError(t, err, "could not parse time "+nowStr)