Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/7.x' into mergify/bp/7.x/pr-187
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus committed Mar 30, 2021
2 parents d5ca9d9 + 23bb5b3 commit 8953220
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 37 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ Third party libraries used by the Elastic Beats project:

--------------------------------------------------------------------------------
Dependency : github.com/aleksmaus/generate
Version: v0.0.0-20201213151810-c5bc68a6a42f
Version: v0.0.0-20210326194607-c630e07a2742
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/aleksmaus/[email protected]20201213151810-c5bc68a6a42f/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/aleksmaus/[email protected]20210326194607-c630e07a2742/LICENSE.txt:

MIT License

Expand Down
7 changes: 4 additions & 3 deletions cmd/fleet/bulkCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/rs/zerolog/log"
)
Expand All @@ -22,7 +23,7 @@ const kBulkCheckinFlushInterval = 10 * time.Second

type PendingData struct {
fields Fields
seqNo int64
seqNo sqn.SeqNo
}

type BulkCheckin struct {
Expand All @@ -38,7 +39,7 @@ func NewBulkCheckin(bulker bulk.Bulk) *BulkCheckin {
}
}

func (bc *BulkCheckin) CheckIn(id string, fields Fields, seqno int64) error {
func (bc *BulkCheckin) CheckIn(id string, fields Fields, seqno sqn.SeqNo) error {

if fields == nil {
fields = make(Fields)
Expand Down Expand Up @@ -93,7 +94,7 @@ func (bc *BulkCheckin) flush(ctx context.Context) error {
for id, pendingData := range pending {
doc := pendingData.fields
doc[dl.FieldUpdatedAt] = time.Now().UTC().Format(time.RFC3339)
if pendingData.seqNo >= 0 {
if pendingData.seqNo.IsSet() {
doc[dl.FieldActionSeqNo] = pendingData.seqNo
}

Expand Down
11 changes: 4 additions & 7 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
<<<<<<< HEAD
=======
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
>>>>>>> a743bad... Indexing permissions as part of the Elastic Agent policy (#187)

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -205,7 +202,7 @@ func (ct *CheckinT) _handleCheckin(w http.ResponseWriter, r *http.Request, id st
}

// Resolve AckToken from request, fallback on the agent record
func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent *model.Agent) (seqno int64, err error) {
func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent *model.Agent) (seqno sqn.SeqNo, err error) {
// Resolve AckToken from request, fallback on the agent record
ackToken := req.AckToken
seqno = agent.ActionSeqNo
Expand All @@ -221,16 +218,16 @@ func (ct *CheckinT) resolveSeqNo(ctx context.Context, req CheckinRequest, agent
return
}
}
seqno = sn
seqno = []int64{sn}
}
return seqno, nil
}

func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno int64, agentId string) ([]model.Action, error) {
func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno sqn.SeqNo, agentId string) ([]model.Action, error) {
now := time.Now().UTC().Format(time.RFC3339)

return dl.FindActions(ctx, ct.bulker, dl.QueryAgentActions, map[string]interface{}{
dl.FieldSeqNo: seqno,
dl.FieldSeqNo: seqno.Get(0),
dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint(),
dl.FieldExpiration: now,
dl.FieldAgents: []string{agentId},
Expand Down
13 changes: 1 addition & 12 deletions cmd/fleet/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/elastic/go-elasticsearch/v8"
"github.com/gofrs/uuid"
Expand Down Expand Up @@ -186,25 +187,13 @@ func _enroll(ctx context.Context, bulker bulk.Bulk, c cache.Cache, req EnrollReq
}

agentData := model.Agent{
<<<<<<< HEAD
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessApiKeyId: accessApiKey.Id,
DefaultApiKeyId: defaultOutputApiKey.Id,
DefaultApiKey: defaultOutputApiKey.Agent(),
ActionSeqNo: dl.UndefinedSeqNo,
=======
Active: true,
PolicyId: erec.PolicyId,
Type: req.Type,
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
AccessApiKeyId: accessApiKey.Id,
ActionSeqNo: []int64{sqn.UndefinedSeqNo},
>>>>>>> a743bad... Indexing permissions as part of the Elastic Agent policy (#187)
}

err = createFleetAgent(ctx, bulker, agentId, agentData)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/elastic/fleet-server/v7
go 1.15

require (
github.com/aleksmaus/generate v0.0.0-20201213151810-c5bc68a6a42f
github.com/aleksmaus/generate v0.0.0-20210326194607-c630e07a2742
github.com/dgraph-io/ristretto v0.0.3
github.com/elastic/beats/v7 v7.11.1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aleksmaus/generate v0.0.0-20201213151810-c5bc68a6a42f h1:wr9LrxkE1Ai416C/mis1gEDsXrbERHGufCmf7xuYwI4=
github.com/aleksmaus/generate v0.0.0-20201213151810-c5bc68a6a42f/go.mod h1:lvlu2Ij1bLmxB8RUWyw5IQ4/JcLX60eYhLiBmvImnhk=
github.com/aleksmaus/generate v0.0.0-20210326194607-c630e07a2742 h1:lDBhj+4eBCS9tNiJLXrNbvwO5xwkn2/kjvy+tO+PWlI=
github.com/aleksmaus/generate v0.0.0-20210326194607-c630e07a2742/go.mod h1:lvlu2Ij1bLmxB8RUWyw5IQ4/JcLX60eYhLiBmvImnhk=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2ZeepYHvHuRdG76f3tRUTdIQDzRBeI=
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg=
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 h1:WFwa9pqou0Nb4DdfBOyaBTH0GqLE74Qwdf61E7ITHwQ=
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"

"github.com/rs/zerolog/log"
)

type Sub struct {
agentId string
seqNo int64
seqNo sqn.SeqNo
ch chan []model.Action
}

Expand Down Expand Up @@ -50,7 +51,7 @@ func (d *Dispatcher) Run(ctx context.Context) (err error) {
}
}

func (d *Dispatcher) Subscribe(agentId string, seqNo int64) *Sub {
func (d *Dispatcher) Subscribe(agentId string, seqNo sqn.SeqNo) *Sub {
cbCh := make(chan []model.Action, 1)

sub := Sub{
Expand Down
9 changes: 3 additions & 6 deletions internal/pkg/dl/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package dl

import "github.com/elastic/fleet-server/v7/internal/pkg/sqn"

// Indices names
const (
FleetActions = ".fleet-actions"
Expand Down Expand Up @@ -42,13 +44,8 @@ const (
FieldIdentifier = "identifier"
)

// Public constants
const (
UndefinedSeqNo = -1
)

// Private constants
const (
defaultSeqNo = UndefinedSeqNo
defaultSeqNo = sqn.UndefinedSeqNo
seqNoPrimaryTerm = "seq_no_primary_term"
)
2 changes: 1 addition & 1 deletion internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions internal/pkg/sqn/sqn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sqn

import (
"fmt"
"strings"
)

const UndefinedSeqNo = -1

// Abstracts the array of document seq numbers
type SeqNo []int64

func (s SeqNo) String() string {
if len(s) == 0 {
return ""
}
return strings.Join(strings.Fields(strings.Trim(fmt.Sprint([]int64(s)), "[]")), ",")
}

func (s SeqNo) IsSet() bool {
return len(s) > 0 && s[0] >= 0
}

func (s SeqNo) Get(idx int) int64 {
if idx < len(s) {
return s[idx]
}
return UndefinedSeqNo
}
5 changes: 4 additions & 1 deletion model/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@
},
"action_seq_no": {
"description": "The last acknowledged action sequence number for the Elastic Agent",
"type": "integer"
"type": "array",
"items": {
"type": "integer"
}
}
},
"required": [
Expand Down

0 comments on commit 8953220

Please sign in to comment.