Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alter checkin API to remove attributes set by audit/unenroll API #3827

Merged
merged 16 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Checkin after audit/unenroll API

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Fix the behaviour of the checkin handler if the audit/unenroll endpoint was previously called.
Calling the checkin handler will now result in the removal of the audit_unenroll and unenroll_time attributes from the agent doc.

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/3827

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/484
1 change: 0 additions & 1 deletion dev-tools/e2e/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '2.3'
services:
kibana:
image: "docker.elastic.co/kibana/kibana:${ELASTICSEARCH_VERSION}-amd64"
Expand Down
55 changes: 27 additions & 28 deletions dev-tools/integration/docker-compose.yml
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@

version: '2.3'
volumes:
certs:
driver: local

services:
setup:
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION}-amd64
Expand Down Expand Up @@ -38,12 +35,6 @@ services:
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
" - name: es03\n"\
" dns:\n"\
" - es03\n"\
" - localhost\n"\
" ip:\n"\
" - 127.0.0.1\n"\
> config/certs/instances.yml;
bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
unzip config/certs/certs.zip -d config/certs;
Expand All @@ -59,23 +50,35 @@ services:
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=elasticsearch
- network.host="0.0.0.0"
- discovery.type=single-node
- xpack.license.self_generated.type=trial
- xpack.security.http.ssl.enabled=false
- xpack.security.enabled=true
- xpack.security.authc.api_key.enabled=true
- xpack.security.authc.token.enabled=true
#- xpack.security.http.ssl.enabled=true
#- xpack.security.http.ssl.key=certs/es01/es01.key
#- xpack.security.http.ssl.certificate=certs/es01/es01.crt
#- xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
#- xpack.security.transport.ssl.enabled=true
#- xpack.security.transport.ssl.key=certs/es01/es01.key
#- xpack.security.transport.ssl.certificate=certs/es01/es01.crt
#- xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
#- xpack.security.transport.ssl.verification_mode=certificate
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms1G -Xmx1G"
- "ELASTIC_USERNAME=${ELASTICSEARCH_USERNAME}"
- "ELASTIC_PASSWORD=${ELASTICSEARCH_PASSWORD}"
mem_limit: 1073741824 # 1g
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- certs:/usr/share/elasticsearch/config/certs
ports:
- 127.0.0.1:9200:9200

elasticsearch-remote:
depends_on:
setup:
Expand All @@ -85,9 +88,12 @@ services:
environment:
- node.name=es02
- cluster.name=es-docker-cluster2
- discovery.seed_hosts=elasticsearch
- bootstrap.memory_lock=true
- network.host="0.0.0.0"
- discovery.type=single-node
- xpack.license.self_generated.type=trial
- xpack.security.enabled=true
- xpack.security.authc.api_key.enabled=true
- xpack.security.authc.token.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=certs/es02/es02.key
- xpack.security.http.ssl.certificate=certs/es02/es02.crt
Expand All @@ -97,23 +103,16 @@ services:
- xpack.security.transport.ssl.certificate=certs/es02/es02.crt
- xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.security.transport.ssl.verification_mode=certificate
- cluster.name="docker-cluster"
- network.host="0.0.0.0"
- xpack.security.authc.api_key.enabled="true"
- xpack.license.self_generated.type="trial"
- "ES_JAVA_OPTS=-Xms1G -Xmx1G"
- bootstrap.memory_lock=true
- "ELASTIC_USERNAME=${ELASTICSEARCH_USERNAME}"
- "ELASTIC_PASSWORD=${ELASTICSEARCH_PASSWORD}"
mem_limit: 1073741824 # 1g
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- certs:/usr/share/elasticsearch/config/certs
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 127.0.0.1:9201:9200

8 changes: 0 additions & 8 deletions dev-tools/integration/elasticsearch.yml

This file was deleted.

4 changes: 2 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason)
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason, agent.AuditUnenrolledReason != "")
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason)
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down
42 changes: 30 additions & 12 deletions internal/pkg/checkin/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package checkin
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -20,6 +21,12 @@ import (

const defaultFlushInterval = 10 * time.Second

// deleteAuditAttributesScript is the script that is ran as part of the bulk update.
//
// the script removes the attributes the audit/unenroll API sets and is only ran if the agent checks in with audit_unenrolled_reason set
// It's ran as part of the same _bulk request as a separate update_by_query failed due to version conflicts.
var deleteAuditAttributesScript = []byte(fmt.Sprintf(`{"script": {"lang": "painless", "source": "ctx._source.remove('%s'); ctx._source.remove('%s'); ctx._source.remove('%s')"}}`, dl.FieldAuditUnenrolledReason, dl.FieldAuditUnenrolledTime, dl.FieldUnenrolledAt))

type optionsT struct {
flushInterval time.Duration
}
Expand All @@ -33,10 +40,11 @@ func WithFlushInterval(d time.Duration) Opt {
}

type extraT struct {
meta []byte
seqNo sqn.SeqNo
ver string
components []byte
meta []byte
seqNo sqn.SeqNo
ver string
components []byte
deleteAudit bool
}

// Minimize the size of this structure.
Expand Down Expand Up @@ -102,17 +110,18 @@ func (bc *Bulk) timestamp() string {
// The pending agents are sent to elasticsearch as a bulk update at each flush interval.
// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down.
// WARNING: Bulk will take ownership of fields, so do not use after passing in.
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string, unhealthyReason *[]string) error {
func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, components []byte, seqno sqn.SeqNo, newVer string, unhealthyReason *[]string, deleteAudit bool) error {
// Separate out the extra data to minimize
// the memory footprint of the 90% case of just
// updating the timestamp.
var extra *extraT
if meta != nil || seqno.IsSet() || newVer != "" || components != nil {
if meta != nil || seqno.IsSet() || newVer != "" || components != nil || deleteAudit {
extra = &extraT{
meta: meta,
seqNo: seqno,
ver: newVer,
components: components,
meta: meta,
seqNo: seqno,
ver: newVer,
components: components,
deleteAudit: deleteAudit,
}
}

Expand Down Expand Up @@ -167,7 +176,8 @@ func (bc *Bulk) flush(ctx context.Context) error {
return nil
}

updates := make([]bulk.MultiOp, 0, len(pending))
// make cap pending*2 in case all checked in agents have audit/unenroll attributes that must be removed.
updates := make([]bulk.MultiOp, 0, len(pending)*2)
michel-laterman marked this conversation as resolved.
Show resolved Hide resolved

simpleCache := make(map[pendingT][]byte)

Expand All @@ -182,7 +192,6 @@ func (bc *Bulk) flush(ctx context.Context) error {
// JSON body containing just the timestamp updates.
var body []byte
if pendingData.extra == nil {

var ok bool
body, ok = simpleCache[pendingData]
if !ok {
Expand Down Expand Up @@ -240,6 +249,15 @@ func (bc *Bulk) flush(ctx context.Context) error {
if body, err = fields.Marshal(); err != nil {
return err
}

// If deleteAudit is set on the agent make sure the script is part of the request.
if pendingData.extra.deleteAudit {
updates = append(updates, bulk.MultiOp{
ID: id,
Body: deleteAuditAttributesScript,
Index: dl.FleetAgents,
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
})
}
}

updates = append(updates, bulk.MultiOp{
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/checkin/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestBulkSimple(t *testing.T) {
mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once()
bc := NewBulk(mockBulk)

if err := bc.CheckIn(c.id, c.status, c.message, c.meta, c.components, c.seqno, c.ver, c.unhealthyReason); err != nil {
if err := bc.CheckIn(c.id, c.status, c.message, c.meta, c.components, c.seqno, c.ver, c.unhealthyReason, false); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func benchmarkBulk(n int, b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, id := range ids {
err := bc.CheckIn(id, "", "", nil, nil, nil, "", nil)
err := bc.CheckIn(id, "", "", nil, nil, nil, "", nil, false)
if err != nil {
b.Fatal(err)
}
Expand All @@ -254,7 +254,7 @@ func benchmarkFlush(n int, b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, id := range ids {
err := bc.CheckIn(id, "", "", nil, nil, nil, "", nil)
err := bc.CheckIn(id, "", "", nil, nil, nil, "", nil, false)
if err != nil {
b.Fatal(err)
}
Expand Down
42 changes: 42 additions & 0 deletions internal/pkg/server/fleet_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,4 +1490,46 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) {
require.NoError(t, err)
require.Equal(t, http.StatusConflict, res.StatusCode)
res.Body.Close()

t.Logf("Fake a checkin for agent %s", id)
req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+id+"/checkin", strings.NewReader(checkinBody))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+key)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
res, err = cli.Do(req)
require.NoError(t, err)

require.Equal(t, http.StatusOK, res.StatusCode)
t.Log("Checkin successful, verify body")
p, _ := io.ReadAll(res.Body)
res.Body.Close()
var obj map[string]interface{}
err = json.Unmarshal(p, &obj)
require.NoError(t, err)

require.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200/.fleet-agents/_doc/"+id, nil)
require.NoError(t, err)
req.SetBasicAuth("elastic", "changeme")
res, err := cli.Do(req)
require.NoError(t, err)
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return false
}
p, err := io.ReadAll(res.Body)
require.NoError(t, err)
var tmp map[string]interface{}
err = json.Unmarshal(p, &tmp)
require.NoError(t, err)
o, ok := tmp["_source"]
require.Truef(t, ok, "expected to find _source in: %v", tmp)
obj, ok := o.(map[string]interface{})
require.Truef(t, ok, "expected _source to be an object, was: %T", o)
_, ok = obj["audit_unenrolled_reason"]
return !ok
}, time.Second*20, time.Second, "agent document should not have audit_unenrolled_reason attribute")
cancel()
srv.waitExit() //nolint:errcheck // test case
}
24 changes: 24 additions & 0 deletions testing/e2e/api_version/client_api_current.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,28 @@ func (tester *ClientAPITester) TestEnrollAuditUnenroll() {
tester.T().Logf("audit/unenroll endpoint for agent: %s should return conflict", agentID)
status = tester.AuditUnenroll(ctx, agentKey, agentID, api.Uninstall, now)
tester.Require().Equal(http.StatusConflict, status)

tester.T().Logf("test checkin agent: %s", agentID)
ackToken, actions, statusCode := tester.Checkin(ctx, agentKey, agentID, nil, nil, nil)
tester.Require().Equal(http.StatusOK, statusCode, "Expected status code 200 for successful checkin")

// verify that audit_unenrolled_reason attribute does not exist in agent doc
tester.Require().Eventually(func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, tester.ESHosts+"/.fleet-agents/_doc/"+agentID, nil)
tester.Require().NoError(err)
req.SetBasicAuth(tester.ElasticUser, tester.ElasticPass)
res, err := tester.Client.Do(req)
tester.Require().NoError(err)
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return false
}
var obj struct {
source map[string]interface{} `json:"_source"`
}
err = json.NewDecoder(res.Body).Decode(&obj)
tester.Require().NoError(err)
_, ok := obj.source["audit_unenrolled_reason"]
return !ok
}, time.Second*20, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute")
}
Loading