Skip to content

Commit

Permalink
Utilize new ES fleet polling API for global checkpoint monitoring (#200)
Browse files Browse the repository at this point in the history
* Utilize fleet polling API for global checkpoint monitoring

* Adjust for API changes, configurable poll timeout

* Update retryDelay to 3 secs

* Update tests for monitor API change

* Adjust to the latest API changes

* Remove fleet indexes bootstrapping for tests, it is done by fleet system index plugin now

* Fix unit tests
  • Loading branch information
aleksmaus authored Apr 15, 2021
1 parent 91af65a commit 13a5550
Show file tree
Hide file tree
Showing 22 changed files with 391 additions and 161 deletions.
4 changes: 2 additions & 2 deletions cmd/fleet/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno sqn.SeqN
now := time.Now().UTC().Format(time.RFC3339)

return dl.FindActions(ctx, ct.bulker, dl.QueryAgentActions, map[string]interface{}{
dl.FieldSeqNo: seqno.Get(0),
dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint(),
dl.FieldSeqNo: seqno.Value(),
dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint().Value(),
dl.FieldExpiration: now,
dl.FieldAgents: []string{agentId},
})
Expand Down
20 changes: 17 additions & 3 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/coordinator"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
Expand Down Expand Up @@ -505,7 +506,13 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
// shutdown before the bulker is then cancelled.
bulkCtx, bulkCancel := context.WithCancel(context.Background())
defer bulkCancel()
es, bulker, err := bulk.InitES(bulkCtx, cfg)
esCli, bulker, err := bulk.InitES(bulkCtx, cfg)
if err != nil {
return err
}

// Monitoring es client, longer timeout, no retries
monCli, err := es.NewClient(ctx, cfg, true)
if err != nil {
return err
}
Expand All @@ -514,7 +521,10 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
g, ctx := errgroup.WithContext(ctx)

// Coordinator policy monitor
pim, err := monitor.New(dl.FleetPolicies, es, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
pim, err := monitor.New(dl.FleetPolicies, esCli, monCli,
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
)
if err != nil {
return err
}
Expand All @@ -536,7 +546,11 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
var ad *action.Dispatcher
var tr *action.TokenResolver

am, err = monitor.NewSimple(dl.FleetActions, es, monitor.WithExpiration(true), monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize))
am, err = monitor.NewSimple(dl.FleetActions, esCli, monCli,
monitor.WithExpiration(true),
monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize),
monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout),
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dev-tools/integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
checkErr(err)

ctx := context.Background()
es, err := es.NewClient(ctx, cfg)
es, err := es.NewClient(ctx, cfg, false)
checkErr(err)

err = esutil.EnsureESIndices(ctx, es)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const (

func InitES(ctx context.Context, cfg *config.Config, opts ...BulkOpt) (*elasticsearch.Client, Bulk, error) {

es, err := es.NewClient(ctx, cfg)
es, err := es.NewClient(ctx, cfg, false)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -182,7 +183,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -269,7 +271,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down Expand Up @@ -356,7 +359,8 @@ func TestConfig(t *testing.T) {
MaxCost: defaultCacheMaxCost,
},
Monitor: Monitor{
FetchSize: defaultFetchSize,
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Expand Down
9 changes: 7 additions & 2 deletions internal/pkg/config/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

package config

import "time"

const (
defaultFetchSize = 1000
defaultFetchSize = 1000
defaultPollTimeout = 5 * time.Minute
)

type Monitor struct {
FetchSize int `config:"fetch_size"`
FetchSize int `config:"fetch_size"`
PollTimeout time.Duration `config:"poll_timeout"`
}

func (m *Monitor) InitDefaults() {
m.FetchSize = defaultFetchSize
m.PollTimeout = defaultPollTimeout
}
18 changes: 17 additions & 1 deletion internal/pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

// The timeout would be driven by the server for long poll.
// Giving it some sane long value.
const httpTransportLongPollTimeout = 10 * time.Minute

var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`)

// Elasticsearch is the configuration for elasticsearch.
Expand Down Expand Up @@ -77,7 +81,7 @@ func (c *Elasticsearch) Validate() error {
}

// ToESConfig converts the configuration object into the config for the elasticsearch client.
func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
func (c *Elasticsearch) ToESConfig(longPoll bool) (elasticsearch.Config, error) {
// build the addresses
addrs := make([]string, len(c.Hosts))
for i, host := range c.Hosts {
Expand All @@ -104,6 +108,17 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
ResponseHeaderTimeout: c.Timeout,
ExpectContinueTimeout: 1 * time.Second,
}

disableRetry := false

if longPoll {
httpTransport.IdleConnTimeout = httpTransportLongPollTimeout
httpTransport.ResponseHeaderTimeout = httpTransportLongPollTimeout

// no retries for long poll monitoring
disableRetry = true
}

if c.TLS != nil && c.TLS.IsEnabled() {
tls, err := tlscommon.LoadTLSConfig(c.TLS)
if err != nil {
Expand Down Expand Up @@ -136,6 +151,7 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) {
Header: h,
Transport: httpTransport,
MaxRetries: c.MaxRetries,
DisableRetry: disableRetry,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/config/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestToESConfig(t *testing.T) {
cmpopts.IgnoreUnexported(tls.Config{}),
}
t.Run(name, func(t *testing.T) {
res, err := test.cfg.ToESConfig()
res, err := test.cfg.ToESConfig(false)
require.NoError(t, err)
test.result.Header.Set("X-elastic-product-origin", "fleet")
if !assert.True(t, cmp.Equal(test.result, res, copts...)) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/coordinator/monitor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestMonitorLeadership(t *testing.T) {
serversIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingServer)
policiesIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicy)
leadersIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicyLeader)
pim, err := monitor.New(policiesIndex, bulker.Client())
pim, err := monitor.New(policiesIndex, bulker.Client(), bulker.Client())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/rs/zerolog/log"
)

func NewClient(ctx context.Context, cfg *config.Config) (*elasticsearch.Client, error) {
escfg, err := cfg.Output.Elasticsearch.ToESConfig()
func NewClient(ctx context.Context, cfg *config.Config, longPoll bool) (*elasticsearch.Client, error) {
escfg, err := cfg.Output.Elasticsearch.ToESConfig(longPoll)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/es/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type ErrElastic struct {
func (e *ErrElastic) Unwrap() error {
if e.Type == "index_not_found_exception" {
return ErrIndexNotFound
} else if e.Type == "timeout_exception" {
return ErrTimeout
}

return nil
}

Expand All @@ -35,6 +38,8 @@ var (
ErrElasticNotFound = errors.New("elastic not found")
ErrInvalidBody = errors.New("invalid body")
ErrIndexNotFound = errors.New("index not found")
ErrTimeout = errors.New("timeout")
ErrNotFound = errors.New("not found")
)

func TranslateError(status int, e ErrorT) error {
Expand Down
164 changes: 164 additions & 0 deletions internal/pkg/es/fleet_global_checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package es

import (
"context"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

// The wrapper for the new _fleet global_checkpoints that is not the part of the
// standard client library at the moment.
// The shape mimics the official client API and should be easy drop-in replacement in the future.
// This should be replaced the official client library when/if the new API makes it in.

func NewGlobalCheckpointsRequest(t esapi.Transport) GlobalCheckpoints {
return func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error) {
var r = GlobalCheckpointsRequest{}
for _, f := range o {
f(&r)
}
return r.Do(r.ctx, t)
}
}

// Copied from the official client
func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return strconv.FormatInt(int64(d), 10) + "nanos"
}
return strconv.FormatInt(int64(d)/int64(time.Millisecond), 10) + "ms"
}

type GlobalCheckpoints func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error)

// GlobalCheckpointsRequest configures the _fleet API global_checkpoints request.
//
type GlobalCheckpointsRequest struct {
ctx context.Context

Index string
WaitForAdvance *bool
Checkpoints []int64
Timeout time.Duration

Header http.Header
}

// Do executes the request and returns response or error.
//
func (r GlobalCheckpointsRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) {
var (
method string
path strings.Builder
params map[string]string
)

method = "GET"

path.Grow(1 + len(r.Index) + len("/_fleet/global_checkpoints"))
if len(r.Index) > 0 {
path.WriteString("/")
path.WriteString(r.Index)
}
path.WriteString("/_fleet/global_checkpoints")

params = make(map[string]string)

if r.WaitForAdvance != nil {
params["wait_for_advance"] = strconv.FormatBool(*r.WaitForAdvance)
}

if len(r.Checkpoints) > 0 {
seqNo := sqn.SeqNo(r.Checkpoints)
params["checkpoints"] = seqNo.String()
}

if r.Timeout != 0 {
params["timeout"] = formatDuration(r.Timeout)
}

req, err := http.NewRequest(method, path.String(), nil)
if err != nil {
return nil, err
}

if len(params) > 0 {
q := req.URL.Query()
for k, v := range params {
q.Set(k, v)
}
req.URL.RawQuery = q.Encode()
}

if len(r.Header) > 0 {
if len(req.Header) == 0 {
req.Header = r.Header
} else {
for k, vv := range r.Header {
for _, v := range vv {
req.Header.Add(k, v)
}
}
}
}

if ctx != nil {
req = req.WithContext(ctx)
}

res, err := transport.Perform(req)
if err != nil {
return nil, err
}

response := esapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}

return &response, nil
}

// WithContext sets the request context.
//
func (f GlobalCheckpoints) WithContext(v context.Context) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.ctx = v
}
}

// WithIndex - an index name
//
func (f GlobalCheckpoints) WithIndex(index string) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Index = index
}
}

func (f GlobalCheckpoints) WithWaitForAdvance(v bool) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.WaitForAdvance = &v
}
}

func (f GlobalCheckpoints) WithCheckpoints(checkpoints []int64) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Checkpoints = checkpoints
}
}

func (f GlobalCheckpoints) WithTimeout(to time.Duration) func(*GlobalCheckpointsRequest) {
return func(r *GlobalCheckpointsRequest) {
r.Timeout = to
}
}
Loading

0 comments on commit 13a5550

Please sign in to comment.