diff --git a/cmd/fleet/handleEnroll.go b/cmd/fleet/handleEnroll.go index 31613971a..7d05631fc 100644 --- a/cmd/fleet/handleEnroll.go +++ b/cmd/fleet/handleEnroll.go @@ -443,7 +443,7 @@ func generateOutputApiKey(ctx context.Context, bulk bulk.Bulk, agentId, outputNa func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (*model.EnrollmentAPIKey, error) { - if key, ok := et.cache.GetEnrollmentApiKey(id); ok { + if key, ok := et.cache.GetEnrollmentAPIKey(id); ok { return &key, nil } @@ -458,7 +458,7 @@ func (et *EnrollerT) fetchEnrollmentKeyRecord(ctx context.Context, id string) (* } cost := int64(len(rec.APIKey)) - et.cache.SetEnrollmentApiKey(id, rec, cost) + et.cache.SetEnrollmentAPIKey(id, rec, cost) return &rec, nil } diff --git a/internal/pkg/cache/cache.go b/internal/pkg/cache/cache.go index e917467e3..04921e08e 100644 --- a/internal/pkg/cache/cache.go +++ b/internal/pkg/cache/cache.go @@ -23,17 +23,17 @@ type Cache interface { SetAction(model.Action) GetAction(id string) (model.Action, bool) - SetApiKey(key ApiKey, enabled bool) - ValidApiKey(key ApiKey) bool + SetApiKey(key APIKey, enabled bool) + ValidApiKey(key APIKey) bool - SetEnrollmentApiKey(id string, key model.EnrollmentAPIKey, cost int64) - GetEnrollmentApiKey(id string) (model.EnrollmentAPIKey, bool) + SetEnrollmentAPIKey(id string, key model.EnrollmentAPIKey, cost int64) + GetEnrollmentAPIKey(id string) (model.EnrollmentAPIKey, bool) SetArtifact(artifact model.Artifact) GetArtifact(ident, sha2 string) (model.Artifact, bool) } -type ApiKey = apikey.ApiKey +type APIKey = apikey.ApiKey type SecurityInfo = apikey.SecurityInfo type CacheT struct { @@ -151,7 +151,7 @@ func (c *CacheT) GetAction(id string) (model.Action, bool) { } // SetApiKey sets the API key in the cache. -func (c *CacheT) SetApiKey(key ApiKey, enabled bool) { +func (c *CacheT) SetApiKey(key APIKey, enabled bool) { c.mut.RLock() defer c.mut.RUnlock() @@ -188,7 +188,7 @@ func (c *CacheT) SetApiKey(key ApiKey, enabled bool) { } // ValidApiKey returns true if the ApiKey is valid (aka. also present in cache). -func (c *CacheT) ValidApiKey(key ApiKey) bool { +func (c *CacheT) ValidApiKey(key APIKey) bool { c.mut.RLock() defer c.mut.RUnlock() @@ -210,8 +210,8 @@ func (c *CacheT) ValidApiKey(key ApiKey) bool { return ok } -// GetEnrollmentApiKey returns the enrollment API key by ID. -func (c *CacheT) GetEnrollmentApiKey(id string) (model.EnrollmentAPIKey, bool) { +// GetEnrollmentAPIKey returns the enrollment API key by ID. +func (c *CacheT) GetEnrollmentAPIKey(id string) (model.EnrollmentAPIKey, bool) { c.mut.RLock() defer c.mut.RUnlock() @@ -231,8 +231,8 @@ func (c *CacheT) GetEnrollmentApiKey(id string) (model.EnrollmentAPIKey, bool) { return model.EnrollmentAPIKey{}, false } -// SetEnrollmentApiKey adds the enrollment API key into the cache. -func (c *CacheT) SetEnrollmentApiKey(id string, key model.EnrollmentAPIKey, cost int64) { +// SetEnrollmentAPIKey adds the enrollment API key into the cache. +func (c *CacheT) SetEnrollmentAPIKey(id string, key model.EnrollmentAPIKey, cost int64) { c.mut.RLock() defer c.mut.RUnlock() diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go new file mode 100644 index 000000000..357290258 --- /dev/null +++ b/internal/pkg/policy/policy_output_test.go @@ -0,0 +1,143 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package policy + +import ( + "context" + "testing" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +var TestPayload []byte + +func TestPolicyLogstashOutputPrepare(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeLogstash, + Name: "test output", + Role: &RoleT{ + Sha2: "fake sha", + Raw: TestPayload, + }, + } + + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, smap.Map{}, false) + require.Nil(t, err, "expected prepare to pass") +} +func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeLogstash, + Name: "test output", + Role: nil, + } + + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, smap.Map{}, false) + // No permissions are required by logstash currently + require.Nil(t, err, "expected prepare to pass") +} + +func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeLogstash, + Name: "test output", + Role: &RoleT{ + Sha2: "fake sha", + Raw: TestPayload, + }, + } + + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, smap.Map{}, true) + require.Nil(t, err, "expected prepare to pass") +} + +func TestPolicyESOutputPrepareNoRole(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeElasticsearch, + Name: "test output", + Role: nil, + } + + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, smap.Map{}, false) + require.NotNil(t, err, "expected prepare to error") +} + +func TestPolicyOutputESPrepare(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "fake sha", + Raw: TestPayload, + }, + } + policyMap := smap.Map{ + "test output": map[string]interface{}{ + "api_key": "", + }, + } + + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, &model.Agent{}, policyMap, false) + require.Nil(t, err, "expected prepare to pass") + + updatedKey, ok := policyMap.GetMap("test output")["api_key"].(*bulk.APIKey) + + require.True(t, ok, "unable to case api key") + require.Equal(t, updatedKey.Key, bulker.MockedAPIKey.Key) + require.Equal(t, updatedKey.Id, bulker.MockedAPIKey.Id) + require.Equal(t, len(bulker.ArgumentData.Update), 0, "update should not be called") +} + +func TestPolicyOutputDefaultESPrepare(t *testing.T) { + bulker := ftesting.NewMockBulk(&bulk.APIKey{ + Id: "test id", + Key: "test key", + }) + po := PolicyOutput{ + Type: OutputTypeElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "fake sha", + Raw: TestPayload, + }, + } + policyMap := smap.Map{ + "test output": map[string]interface{}{}, + } + testAgent := &model.Agent{} + err := po.Prepare(context.Background(), zerolog.Logger{}, bulker, testAgent, policyMap, true) + require.Nil(t, err, "expected prepare to pass") + + updatedKey, ok := policyMap.GetMap("test output")["api_key"].(*bulk.APIKey) + + require.True(t, ok, "unable to case api key") + require.Equal(t, updatedKey.Key, bulker.MockedAPIKey.Key) + require.Equal(t, updatedKey.Id, bulker.MockedAPIKey.Id) + require.Greater(t, len(bulker.ArgumentData.Update), 0, "update should be called") +} diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 2b67f10da..8fddac939 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -223,11 +223,11 @@ func (m *selfMonitorT) updateStatus(ctx context.Context) (proto.StateObserved_St // no fleet-server input m.status = proto.StateObserved_STARTING if m.policyID == "" { - m.reporter.Status(proto.StateObserved_STARTING, "Waiting on fleet-server input to be added to default policy", nil) + err = m.reporter.Status(proto.StateObserved_STARTING, "Waiting on fleet-server input to be added to default policy", nil) } else { - m.reporter.Status(proto.StateObserved_STARTING, fmt.Sprintf("Waiting on fleet-server input to be added to policy: %s", m.policyID), nil) + err = m.reporter.Status(proto.StateObserved_STARTING, fmt.Sprintf("Waiting on fleet-server input to be added to policy: %s", m.policyID), nil) } - return proto.StateObserved_STARTING, nil + return proto.StateObserved_STARTING, err } status := proto.StateObserved_HEALTHY @@ -247,11 +247,11 @@ func (m *selfMonitorT) updateStatus(ctx context.Context) (proto.StateObserved_St if len(tokens) == 0 { // no tokens created for the policy, still starting if m.policyID == "" { - m.reporter.Status(proto.StateObserved_STARTING, "Waiting on active enrollment keys to be created in default policy with Fleet Server integration", nil) + err = m.reporter.Status(proto.StateObserved_STARTING, "Waiting on active enrollment keys to be created in default policy with Fleet Server integration", nil) } else { - m.reporter.Status(proto.StateObserved_STARTING, fmt.Sprintf("Waiting on active enrollment keys to be created in policy with Fleet Server integration: %s", m.policyID), nil) + err = m.reporter.Status(proto.StateObserved_STARTING, fmt.Sprintf("Waiting on active enrollment keys to be created in policy with Fleet Server integration: %s", m.policyID), nil) } - return proto.StateObserved_STARTING, nil + return proto.StateObserved_STARTING, err } payload = map[string]interface{}{ "enrollment_token": tokens[0].APIKey, @@ -259,11 +259,11 @@ func (m *selfMonitorT) updateStatus(ctx context.Context) (proto.StateObserved_St } m.status = status if m.policyID == "" { - m.reporter.Status(status, fmt.Sprintf("Running on default policy with Fleet Server integration%s", extendMsg), payload) + err = m.reporter.Status(status, fmt.Sprintf("Running on default policy with Fleet Server integration%s", extendMsg), payload) } else { - m.reporter.Status(status, fmt.Sprintf("Running on policy with Fleet Server integration: %s%s", m.policyID, extendMsg), payload) + err = m.reporter.Status(status, fmt.Sprintf("Running on policy with Fleet Server integration: %s%s", m.policyID, extendMsg), payload) } - return status, nil + return status, err } type policyData struct { diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index fdc304bef..98fab51a4 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -228,7 +228,7 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { return nil }, ftesting.RetrySleep(1*time.Second)) - policyId := "fleet-server-policy" + policyID := "fleet-server-policy" rId := xid.New().String() policyContents, err := json.Marshal(&policyData{Inputs: []policyInput{ { @@ -244,7 +244,7 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, + PolicyID: policyID, CoordinatorIdx: 1, Data: policyContents, RevisionIdx: 1, @@ -263,7 +263,7 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { APIKey: "d2JndlFIWUJJUVVxWDVia2NJTV86X0d6ZmljZGNTc1d4R1otbklrZFFRZw==", APIKeyID: xid.New().String(), Name: "Inactive", - PolicyID: policyId, + PolicyID: policyID, } tokenLock.Lock() tokenResult = append(tokenResult, inactiveToken) @@ -304,7 +304,7 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { APIKey: "d2JndlFIWUJJUVVxWDVia2NJTV86X0d6ZmljZGNTc1d4R1otbklrZFFRZw==", APIKeyID: xid.New().String(), Name: "Active", - PolicyID: policyId, + PolicyID: policyID, } tokenLock.Lock() tokenResult = append(tokenResult, activeToken) @@ -348,11 +348,11 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { ID: "agent-id", }, } - policyId := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() reporter := &FakeReporter{} bulker := ftesting.MockBulk{} mm := mock.NewMockIndexMonitor() - monitor := NewSelfMonitor(cfg, bulker, mm, policyId, reporter) + monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter) sm := monitor.(*selfMonitorT) sm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { return []model.Policy{}, nil @@ -376,7 +376,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { if status != proto.StateObserved_STARTING { return fmt.Errorf("should be reported as starting; instead its %s", status) } - if msg != fmt.Sprintf("Waiting on policy with Fleet Server integration: %s", policyId) { + if msg != fmt.Sprintf("Waiting on policy with Fleet Server integration: %s", policyID) { return fmt.Errorf("should be matching with specific policy") } return nil @@ -393,7 +393,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, + PolicyID: policyID, CoordinatorIdx: 1, Data: policyContents, RevisionIdx: 2, @@ -419,7 +419,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { if status != proto.StateObserved_STARTING { return fmt.Errorf("should be reported as starting; instead its %s", status) } - if msg != fmt.Sprintf("Waiting on fleet-server input to be added to policy: %s", policyId) { + if msg != fmt.Sprintf("Waiting on fleet-server input to be added to policy: %s", policyID) { return fmt.Errorf("should be matching with specific policy") } return nil @@ -440,7 +440,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { Version: 1, SeqNo: 2, }, - PolicyID: policyId, + PolicyID: policyID, CoordinatorIdx: 1, Data: policyContents, RevisionIdx: 1, @@ -466,7 +466,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { if status != proto.StateObserved_HEALTHY { return fmt.Errorf("should be reported as healthy; instead its %s", status) } - if msg != fmt.Sprintf("Running on policy with Fleet Server integration: %s", policyId) { + if msg != fmt.Sprintf("Running on policy with Fleet Server integration: %s", policyID) { return fmt.Errorf("should be matching with specific policy") } return nil @@ -488,11 +488,11 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { ID: "", }, } - policyId := uuid.Must(uuid.NewV4()).String() + policyID := uuid.Must(uuid.NewV4()).String() reporter := &FakeReporter{} bulker := ftesting.MockBulk{} mm := mock.NewMockIndexMonitor() - monitor := NewSelfMonitor(cfg, bulker, mm, policyId, reporter) + monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter) sm := monitor.(*selfMonitorT) sm.checkTime = 100 * time.Millisecond @@ -530,7 +530,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { if status != proto.StateObserved_STARTING { return fmt.Errorf("should be reported as starting; instead its %s", status) } - if msg != fmt.Sprintf("Waiting on policy with Fleet Server integration: %s", policyId) { + if msg != fmt.Sprintf("Waiting on policy with Fleet Server integration: %s", policyID) { return fmt.Errorf("should be matching with specific policy") } return nil @@ -551,7 +551,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, + PolicyID: policyID, CoordinatorIdx: 1, Data: policyContents, RevisionIdx: 1, @@ -570,7 +570,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { APIKey: "d2JndlFIWUJJUVVxWDVia2NJTV86X0d6ZmljZGNTc1d4R1otbklrZFFRZw==", APIKeyID: xid.New().String(), Name: "Inactive", - PolicyID: policyId, + PolicyID: policyID, } tokenLock.Lock() tokenResult = append(tokenResult, inactiveToken) @@ -596,7 +596,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { if status != proto.StateObserved_STARTING { return fmt.Errorf("should be reported as starting; instead its %s", status) } - if msg != fmt.Sprintf("Waiting on active enrollment keys to be created in policy with Fleet Server integration: %s", policyId) { + if msg != fmt.Sprintf("Waiting on active enrollment keys to be created in policy with Fleet Server integration: %s", policyID) { return fmt.Errorf("should be matching with specific policy") } return nil @@ -611,7 +611,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { APIKey: "d2JndlFIWUJJUVVxWDVia2NJTV86X0d6ZmljZGNTc1d4R1otbklrZFFRZw==", APIKeyID: xid.New().String(), Name: "Active", - PolicyID: policyId, + PolicyID: policyID, } tokenLock.Lock() tokenResult = append(tokenResult, activeToken) @@ -623,7 +623,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { if status != proto.StateObserved_DEGRADED { return fmt.Errorf("should be reported as degraded; instead its %s", status) } - if msg != fmt.Sprintf("Running on policy with Fleet Server integration: %s; missing config fleet.agent.id (expected during bootstrap process)", policyId) { + if msg != fmt.Sprintf("Running on policy with Fleet Server integration: %s; missing config fleet.agent.id (expected during bootstrap process)", policyID) { return fmt.Errorf("should be matching with specific policy") } if payload == nil {