Skip to content

Commit

Permalink
Switch to the new _fleet/_fleet_search and _fleet/_fleet_msearch Elas…
Browse files Browse the repository at this point in the history
…ticsearch Fleet APIs, remove holes detection and refreshes (#814)

* Switch to the new _fleet/_fleet_search and _fleet/_fleet_msearch Elasticsearch Fleet APIs, remove holes detection and refreshes

* Switch to the new _fleet/_fleet_msearch and _fleet/_fleet_search Fleet APIs
  endpoints for the searches that required refreshes and wait for
  checkpoints. The new API handles refreshes and checkpoints waits.
* Separate queues for _msearch and _fleet_msearch, to avoid delays on
  searches without checkpoints wait. Use _fleet/_fleet_msearch endpoint if search is requested with
  wait_for_checkpoints. Use _fleet/_fleet_search for the monitor hits
  fetch.
* Had to copy over the search and msearch wrappers from go-elasticsearch
  library and customize them for _fleet_search and _fleet_msearch.
  These could be removed once the library is updated for these new
  endpoints.
* Removed the holes detection and refresh op code as it's not longer
  used.
  • Loading branch information
aleksmaus authored Oct 29, 2021
1 parent b05e403 commit a2fb073
Show file tree
Hide file tree
Showing 15 changed files with 1,242 additions and 239 deletions.
2 changes: 2 additions & 0 deletions internal/pkg/bulk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
ActionUpdate
ActionRead
ActionSearch
ActionFleetSearch
)

var actionStrings = []string{
Expand All @@ -54,6 +55,7 @@ var actionStrings = []string{
"update",
"read",
"search",
"fleet_search",
}

func (a actionT) String() string {
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func blkToQueueType(blk *bulkT) queueType {
switch blk.action {
case ActionSearch:
queueIdx = kQueueSearch
case ActionFleetSearch:
queueIdx = kQueueFleetSearch
case ActionRead:
if forceRefresh {
queueIdx = kQueueRefreshRead
Expand Down Expand Up @@ -272,7 +274,7 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
switch queue.ty {
case kQueueRead, kQueueRefreshRead:
err = b.flushRead(ctx, queue)
case kQueueSearch:
case kQueueSearch, kQueueFleetSearch:
err = b.flushSearch(ctx, queue)
default:
err = b.flushBulk(ctx, queue)
Expand Down
61 changes: 48 additions & 13 deletions internal/pkg/bulk/opSearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mailru/easyjson"
"github.com/rs/zerolog/log"
Expand All @@ -23,13 +24,19 @@ func (b *Bulker) Search(ctx context.Context, index string, body []byte, opts ...
opt = b.parseOpts(opts...)
}

blk := b.newBlk(ActionSearch, opt)
action := ActionSearch

// Use /_fleet/_fleet_msearch fleet plugin endpoint if need to wait for checkpoints
if len(opt.WaitForCheckpoints) > 0 {
action = ActionFleetSearch
}
blk := b.newBlk(action, opt)

// Serialize request
const kSlop = 64
blk.buf.Grow(len(body) + kSlop)

if err := b.writeMsearchMeta(&blk.buf, index, opt.Indices); err != nil {
if err := b.writeMsearchMeta(&blk.buf, index, opt.Indices, opt.WaitForCheckpoints); err != nil {
return nil, err
}

Expand All @@ -49,11 +56,15 @@ func (b *Bulker) Search(ctx context.Context, index string, body []byte, opts ...
return &es.ResultT{HitsT: r.Hits, Aggregations: r.Aggregations}, nil
}

func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string) error {
func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string, checkpoints []int64) error {
if err := b.validateIndex(index); err != nil {
return err
}

needComma := true

buf.WriteString("{")

if len(moreIndices) > 0 {
if err := b.validateIndices(moreIndices); err != nil {
return err
Expand All @@ -62,21 +73,31 @@ func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string)
indices := []string{index}
indices = append(indices, moreIndices...)

buf.WriteString(`{"index": `)
buf.WriteString(`"index": `)
if d, err := json.Marshal(indices); err != nil {
return err
} else {
buf.Write(d)
}
buf.WriteString("}\n")
} else if len(index) == 0 {
buf.WriteString("{ }\n")
} else {
buf.WriteString(`{"index": "`)
} else if index != "" {
buf.WriteString(`"index": "`)
buf.WriteString(index)
buf.WriteString("\"}\n")
buf.WriteString("\"")
} else {
needComma = false
}

if len(checkpoints) > 0 {
if needComma {
buf.WriteString(`,`)
}
buf.WriteString(` "wait_for_checkpoints": `)
// Write array as string, example: [1,2,3]
buf.WriteString(sqn.SeqNo(checkpoints).JSONString())
}

buf.WriteString("}\n")

return nil
}

Expand Down Expand Up @@ -108,10 +129,24 @@ func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error {
}

// Do actual bulk request; and send response on chan
req := esapi.MsearchRequest{
Body: bytes.NewReader(buf.Bytes()),
var (
res *esapi.Response
err error
)

if queue.ty == kQueueFleetSearch {
// Using custom _fleet/_fleet_msearch, possibly temporary
// Replace with regular _msearch if _fleet/_fleet_msearch implementation merges with _msearch
req := es.FleetMsearchRequest{
Body: bytes.NewReader(buf.Bytes()),
}
res, err = req.Do(ctx, b.es)
} else {
req := esapi.MsearchRequest{
Body: bytes.NewReader(buf.Bytes()),
}
res, err = req.Do(ctx, b.es)
}
res, err := req.Do(ctx, b.es)

if err != nil {
return err
Expand Down
17 changes: 13 additions & 4 deletions internal/pkg/bulk/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
package bulk

import (
"github.com/rs/zerolog"
"strconv"
"time"

"github.com/rs/zerolog"

"github.com/elastic/fleet-server/v7/internal/pkg/config"
)

//-----
// Transaction options

type optionsT struct {
Refresh bool
RetryOnConflict string
Indices []string
Refresh bool
RetryOnConflict string
Indices []string
WaitForCheckpoints []int64
}

type Opt func(*optionsT)
Expand All @@ -42,6 +44,13 @@ func WithIndex(idx string) Opt {
}
}

// Applicable to _fleet_msearch, wait_for_checkpoints parameters
func WithWaitForCheckpoints(checkpoints []int64) Opt {
return func(opt *optionsT) {
opt.WaitForCheckpoints = checkpoints
}
}

//-----
// Bulk API options

Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/bulk/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
kQueueBulk queueType = iota
kQueueRead
kQueueSearch
kQueueFleetSearch
kQueueRefreshBulk
kQueueRefreshRead
kNumQueues
Expand All @@ -30,6 +31,8 @@ func (q queueT) Type() string {
return "read"
case kQueueSearch:
return "search"
case kQueueFleetSearch:
return "fleetSearch"
case kQueueRefreshBulk:
return "refreshBulk"
case kQueueRefreshRead:
Expand Down
31 changes: 10 additions & 21 deletions internal/pkg/dl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func FindAction(ctx context.Context, bulker bulk.Bulk, id string, opts ...Option
o := newOption(FleetActions, opts...)
return findActions(ctx, bulker, QueryAction, o.indexName, map[string]interface{}{
FieldActionId: id,
})
}

func FindActions(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, params map[string]interface{}) ([]model.Action, error) {
return findActions(ctx, bulker, tmpl, FleetActions, params)
}, nil)
}

func FindAgentActions(ctx context.Context, bulker bulk.Bulk, minSeqNo, maxSeqNo sqn.SeqNo, agentId string) ([]model.Action, error) {
Expand All @@ -94,27 +90,20 @@ func FindAgentActions(ctx context.Context, bulker bulk.Bulk, minSeqNo, maxSeqNo
FieldAgents: []string{agentId},
}

res, err := findActionsHits(ctx, bulker, QueryAgentActions, index, params)
res, err := findActionsHits(ctx, bulker, QueryAgentActions, index, params, maxSeqNo)
if err != nil || res == nil {
return nil, err
}

if es.HasHoles(minSeqNo, res.Hits) {
err = es.Refresh(ctx, bulker.Client(), index)
if err != nil {
log.Error().Err(err).Msg("failed to refresh index")
}
res, err := findActionsHits(ctx, bulker, QueryAgentActions, index, params)
if err != nil || res == nil {
return nil, err
}
}

return hitsToActions(res.Hits)
}

func findActionsHits(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}) (*es.HitsT, error) {
res, err := Search(ctx, bulker, tmpl, index, params)
func findActionsHits(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}, seqNos []int64) (*es.HitsT, error) {
var ops []bulk.Opt
if len(seqNos) > 0 {
ops = append(ops, bulk.WithWaitForCheckpoints(seqNos))
}
res, err := Search(ctx, bulker, tmpl, index, params, ops...)
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
log.Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error())
Expand All @@ -125,8 +114,8 @@ func findActionsHits(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, inde
return res, nil
}

func findActions(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}) ([]model.Action, error) {
res, err := findActionsHits(ctx, bulker, tmpl, index, params)
func findActions(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}, seqNos []int64) ([]model.Action, error) {
res, err := findActionsHits(ctx, bulker, tmpl, index, params, seqNos)
if err != nil || res == nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/dl/actions_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSearchActionsQuery(t *testing.T) {
FieldSeqNo: -1,
FieldMaxSeqNo: len(actions),
FieldExpiration: now,
})
}, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/dl/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/es"
)

func Search(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}) (*es.HitsT, error) {
func Search(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}, opts ...bulk.Opt) (*es.HitsT, error) {
query, err := tmpl.Render(params)
if err != nil {
return nil, err
}

res, err := bulker.Search(ctx, index, query)
res, err := bulker.Search(ctx, index, query, opts...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a2fb073

Please sign in to comment.