From 13a5550c51ea65b491be3e9df21f55dd6071a1bb Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 15 Apr 2021 13:28:41 -0400 Subject: [PATCH] Utilize new ES fleet polling API for global checkpoint monitoring (#200) * Utilize fleet polling API for global checkpoint monitoring * Adjust for API changes, configurable poll timeout * Update retryDelay to 3 secs * Update tests for monitor API change * Adjust to the latest API changes * Remove fleet indexes bootstrapping for tests, it is done by fleet system index plugin now * Fix unit tests --- cmd/fleet/handleCheckin.go | 4 +- cmd/fleet/main.go | 20 ++- dev-tools/integration/main.go | 2 +- internal/pkg/bulk/bulk.go | 2 +- internal/pkg/config/config_test.go | 12 +- internal/pkg/config/monitor.go | 9 +- internal/pkg/config/output.go | 18 +- internal/pkg/config/output_test.go | 2 +- .../coordinator/monitor_integration_test.go | 2 +- internal/pkg/es/client.go | 4 +- internal/pkg/es/error.go | 5 + internal/pkg/es/fleet_global_checkpoints.go | 164 ++++++++++++++++++ internal/pkg/monitor/global_checkpoint.go | 94 +++++----- internal/pkg/monitor/mock/monitor.go | 9 +- internal/pkg/monitor/monitor.go | 150 ++++++++-------- .../pkg/monitor/monitor_integration_test.go | 6 +- internal/pkg/monitor/subscription_monitor.go | 10 +- .../subscription_monitor_integration_test.go | 4 +- .../pkg/policy/monitor_integration_test.go | 4 +- internal/pkg/policy/monitor_test.go | 2 + internal/pkg/sqn/sqn.go | 21 ++- internal/pkg/testing/esutil/bootstrap.go | 8 +- 22 files changed, 391 insertions(+), 161 deletions(-) create mode 100644 internal/pkg/es/fleet_global_checkpoints.go diff --git a/cmd/fleet/handleCheckin.go b/cmd/fleet/handleCheckin.go index 07761fe34..d1a7e4dda 100644 --- a/cmd/fleet/handleCheckin.go +++ b/cmd/fleet/handleCheckin.go @@ -313,8 +313,8 @@ func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno sqn.SeqN now := time.Now().UTC().Format(time.RFC3339) return dl.FindActions(ctx, ct.bulker, dl.QueryAgentActions, map[string]interface{}{ - dl.FieldSeqNo: seqno.Get(0), - dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint(), + dl.FieldSeqNo: seqno.Value(), + dl.FieldMaxSeqNo: ct.gcp.GetCheckpoint().Value(), dl.FieldExpiration: now, dl.FieldAgents: []string{agentId}, }) diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index 2590b0334..64d6ccc87 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/coordinator" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/policy" @@ -505,7 +506,13 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er // shutdown before the bulker is then cancelled. bulkCtx, bulkCancel := context.WithCancel(context.Background()) defer bulkCancel() - es, bulker, err := bulk.InitES(bulkCtx, cfg) + esCli, bulker, err := bulk.InitES(bulkCtx, cfg) + if err != nil { + return err + } + + // Monitoring es client, longer timeout, no retries + monCli, err := es.NewClient(ctx, cfg, true) if err != nil { return err } @@ -514,7 +521,10 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er g, ctx := errgroup.WithContext(ctx) // Coordinator policy monitor - pim, err := monitor.New(dl.FleetPolicies, es, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize)) + pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, + monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), + monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), + ) if err != nil { return err } @@ -536,7 +546,11 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er var ad *action.Dispatcher var tr *action.TokenResolver - am, err = monitor.NewSimple(dl.FleetActions, es, monitor.WithExpiration(true), monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize)) + am, err = monitor.NewSimple(dl.FleetActions, esCli, monCli, + monitor.WithExpiration(true), + monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), + monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), + ) if err != nil { return err } diff --git a/dev-tools/integration/main.go b/dev-tools/integration/main.go index a4e329319..e43ed35d1 100644 --- a/dev-tools/integration/main.go +++ b/dev-tools/integration/main.go @@ -31,7 +31,7 @@ func main() { checkErr(err) ctx := context.Background() - es, err := es.NewClient(ctx, cfg) + es, err := es.NewClient(ctx, cfg, false) checkErr(err) err = esutil.EnsureESIndices(ctx, es) diff --git a/internal/pkg/bulk/bulk.go b/internal/pkg/bulk/bulk.go index 2b98ef49f..f47063028 100644 --- a/internal/pkg/bulk/bulk.go +++ b/internal/pkg/bulk/bulk.go @@ -87,7 +87,7 @@ const ( func InitES(ctx context.Context, cfg *config.Config, opts ...BulkOpt) (*elasticsearch.Client, Bulk, error) { - es, err := es.NewClient(ctx, cfg) + es, err := es.NewClient(ctx, cfg, false) if err != nil { return nil, nil, err } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index f50a67c31..c7c00bad3 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -93,7 +93,8 @@ func TestConfig(t *testing.T) { MaxCost: defaultCacheMaxCost, }, Monitor: Monitor{ - FetchSize: defaultFetchSize, + FetchSize: defaultFetchSize, + PollTimeout: defaultPollTimeout, }, }, }, @@ -182,7 +183,8 @@ func TestConfig(t *testing.T) { MaxCost: defaultCacheMaxCost, }, Monitor: Monitor{ - FetchSize: defaultFetchSize, + FetchSize: defaultFetchSize, + PollTimeout: defaultPollTimeout, }, }, }, @@ -269,7 +271,8 @@ func TestConfig(t *testing.T) { MaxCost: defaultCacheMaxCost, }, Monitor: Monitor{ - FetchSize: defaultFetchSize, + FetchSize: defaultFetchSize, + PollTimeout: defaultPollTimeout, }, }, }, @@ -356,7 +359,8 @@ func TestConfig(t *testing.T) { MaxCost: defaultCacheMaxCost, }, Monitor: Monitor{ - FetchSize: defaultFetchSize, + FetchSize: defaultFetchSize, + PollTimeout: defaultPollTimeout, }, }, }, diff --git a/internal/pkg/config/monitor.go b/internal/pkg/config/monitor.go index 1d3f8a31d..e88e8e09a 100644 --- a/internal/pkg/config/monitor.go +++ b/internal/pkg/config/monitor.go @@ -4,14 +4,19 @@ package config +import "time" + const ( - defaultFetchSize = 1000 + defaultFetchSize = 1000 + defaultPollTimeout = 5 * time.Minute ) type Monitor struct { - FetchSize int `config:"fetch_size"` + FetchSize int `config:"fetch_size"` + PollTimeout time.Duration `config:"poll_timeout"` } func (m *Monitor) InitDefaults() { m.FetchSize = defaultFetchSize + m.PollTimeout = defaultPollTimeout } diff --git a/internal/pkg/config/output.go b/internal/pkg/config/output.go index b800f4100..7e6e5205d 100644 --- a/internal/pkg/config/output.go +++ b/internal/pkg/config/output.go @@ -20,6 +20,10 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) +// The timeout would be driven by the server for long poll. +// Giving it some sane long value. +const httpTransportLongPollTimeout = 10 * time.Minute + var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`) // Elasticsearch is the configuration for elasticsearch. @@ -77,7 +81,7 @@ func (c *Elasticsearch) Validate() error { } // ToESConfig converts the configuration object into the config for the elasticsearch client. -func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) { +func (c *Elasticsearch) ToESConfig(longPoll bool) (elasticsearch.Config, error) { // build the addresses addrs := make([]string, len(c.Hosts)) for i, host := range c.Hosts { @@ -104,6 +108,17 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) { ResponseHeaderTimeout: c.Timeout, ExpectContinueTimeout: 1 * time.Second, } + + disableRetry := false + + if longPoll { + httpTransport.IdleConnTimeout = httpTransportLongPollTimeout + httpTransport.ResponseHeaderTimeout = httpTransportLongPollTimeout + + // no retries for long poll monitoring + disableRetry = true + } + if c.TLS != nil && c.TLS.IsEnabled() { tls, err := tlscommon.LoadTLSConfig(c.TLS) if err != nil { @@ -136,6 +151,7 @@ func (c *Elasticsearch) ToESConfig() (elasticsearch.Config, error) { Header: h, Transport: httpTransport, MaxRetries: c.MaxRetries, + DisableRetry: disableRetry, }, nil } diff --git a/internal/pkg/config/output_test.go b/internal/pkg/config/output_test.go index dc29e2457..9f604df35 100644 --- a/internal/pkg/config/output_test.go +++ b/internal/pkg/config/output_test.go @@ -171,7 +171,7 @@ func TestToESConfig(t *testing.T) { cmpopts.IgnoreUnexported(tls.Config{}), } t.Run(name, func(t *testing.T) { - res, err := test.cfg.ToESConfig() + res, err := test.cfg.ToESConfig(false) require.NoError(t, err) test.result.Header.Set("X-elastic-product-origin", "fleet") if !assert.True(t, cmp.Equal(test.result, res, copts...)) { diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index 29375e6dd..d039a8abc 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -36,7 +36,7 @@ func TestMonitorLeadership(t *testing.T) { serversIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingServer) policiesIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicy) leadersIndex := ftesting.SetupIndex(bulkCtx, t, bulker, es.MappingPolicyLeader) - pim, err := monitor.New(policiesIndex, bulker.Client()) + pim, err := monitor.New(policiesIndex, bulker.Client(), bulker.Client()) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/es/client.go b/internal/pkg/es/client.go index b2159fd96..792700ef9 100644 --- a/internal/pkg/es/client.go +++ b/internal/pkg/es/client.go @@ -15,8 +15,8 @@ import ( "github.com/rs/zerolog/log" ) -func NewClient(ctx context.Context, cfg *config.Config) (*elasticsearch.Client, error) { - escfg, err := cfg.Output.Elasticsearch.ToESConfig() +func NewClient(ctx context.Context, cfg *config.Config, longPoll bool) (*elasticsearch.Client, error) { + escfg, err := cfg.Output.Elasticsearch.ToESConfig(longPoll) if err != nil { return nil, err } diff --git a/internal/pkg/es/error.go b/internal/pkg/es/error.go index 4966c88f6..008097546 100644 --- a/internal/pkg/es/error.go +++ b/internal/pkg/es/error.go @@ -22,7 +22,10 @@ type ErrElastic struct { func (e *ErrElastic) Unwrap() error { if e.Type == "index_not_found_exception" { return ErrIndexNotFound + } else if e.Type == "timeout_exception" { + return ErrTimeout } + return nil } @@ -35,6 +38,8 @@ var ( ErrElasticNotFound = errors.New("elastic not found") ErrInvalidBody = errors.New("invalid body") ErrIndexNotFound = errors.New("index not found") + ErrTimeout = errors.New("timeout") + ErrNotFound = errors.New("not found") ) func TranslateError(status int, e ErrorT) error { diff --git a/internal/pkg/es/fleet_global_checkpoints.go b/internal/pkg/es/fleet_global_checkpoints.go new file mode 100644 index 000000000..449cc17e6 --- /dev/null +++ b/internal/pkg/es/fleet_global_checkpoints.go @@ -0,0 +1,164 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package es + +import ( + "context" + "net/http" + "strconv" + "strings" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + "github.com/elastic/go-elasticsearch/v8/esapi" +) + +// The wrapper for the new _fleet global_checkpoints that is not the part of the +// standard client library at the moment. +// The shape mimics the official client API and should be easy drop-in replacement in the future. +// This should be replaced the official client library when/if the new API makes it in. + +func NewGlobalCheckpointsRequest(t esapi.Transport) GlobalCheckpoints { + return func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error) { + var r = GlobalCheckpointsRequest{} + for _, f := range o { + f(&r) + } + return r.Do(r.ctx, t) + } +} + +// Copied from the official client +func formatDuration(d time.Duration) string { + if d < time.Millisecond { + return strconv.FormatInt(int64(d), 10) + "nanos" + } + return strconv.FormatInt(int64(d)/int64(time.Millisecond), 10) + "ms" +} + +type GlobalCheckpoints func(o ...func(*GlobalCheckpointsRequest)) (*esapi.Response, error) + +// GlobalCheckpointsRequest configures the _fleet API global_checkpoints request. +// +type GlobalCheckpointsRequest struct { + ctx context.Context + + Index string + WaitForAdvance *bool + Checkpoints []int64 + Timeout time.Duration + + Header http.Header +} + +// Do executes the request and returns response or error. +// +func (r GlobalCheckpointsRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) { + var ( + method string + path strings.Builder + params map[string]string + ) + + method = "GET" + + path.Grow(1 + len(r.Index) + len("/_fleet/global_checkpoints")) + if len(r.Index) > 0 { + path.WriteString("/") + path.WriteString(r.Index) + } + path.WriteString("/_fleet/global_checkpoints") + + params = make(map[string]string) + + if r.WaitForAdvance != nil { + params["wait_for_advance"] = strconv.FormatBool(*r.WaitForAdvance) + } + + if len(r.Checkpoints) > 0 { + seqNo := sqn.SeqNo(r.Checkpoints) + params["checkpoints"] = seqNo.String() + } + + if r.Timeout != 0 { + params["timeout"] = formatDuration(r.Timeout) + } + + req, err := http.NewRequest(method, path.String(), nil) + if err != nil { + return nil, err + } + + if len(params) > 0 { + q := req.URL.Query() + for k, v := range params { + q.Set(k, v) + } + req.URL.RawQuery = q.Encode() + } + + if len(r.Header) > 0 { + if len(req.Header) == 0 { + req.Header = r.Header + } else { + for k, vv := range r.Header { + for _, v := range vv { + req.Header.Add(k, v) + } + } + } + } + + if ctx != nil { + req = req.WithContext(ctx) + } + + res, err := transport.Perform(req) + if err != nil { + return nil, err + } + + response := esapi.Response{ + StatusCode: res.StatusCode, + Body: res.Body, + Header: res.Header, + } + + return &response, nil +} + +// WithContext sets the request context. +// +func (f GlobalCheckpoints) WithContext(v context.Context) func(*GlobalCheckpointsRequest) { + return func(r *GlobalCheckpointsRequest) { + r.ctx = v + } +} + +// WithIndex - an index name +// +func (f GlobalCheckpoints) WithIndex(index string) func(*GlobalCheckpointsRequest) { + return func(r *GlobalCheckpointsRequest) { + r.Index = index + } +} + +func (f GlobalCheckpoints) WithWaitForAdvance(v bool) func(*GlobalCheckpointsRequest) { + return func(r *GlobalCheckpointsRequest) { + r.WaitForAdvance = &v + } +} + +func (f GlobalCheckpoints) WithCheckpoints(checkpoints []int64) func(*GlobalCheckpointsRequest) { + return func(r *GlobalCheckpointsRequest) { + r.Checkpoints = checkpoints + } +} + +func (f GlobalCheckpoints) WithTimeout(to time.Duration) func(*GlobalCheckpointsRequest) { + return func(r *GlobalCheckpointsRequest) { + r.Timeout = to + } +} diff --git a/internal/pkg/monitor/global_checkpoint.go b/internal/pkg/monitor/global_checkpoint.go index 89f0e8025..56bd598c8 100644 --- a/internal/pkg/monitor/global_checkpoint.go +++ b/internal/pkg/monitor/global_checkpoint.go @@ -8,73 +8,89 @@ import ( "context" "encoding/json" "errors" - "fmt" + "net/http" + "time" - "github.com/elastic/fleet-server/v7/internal/pkg/es" + esh "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" ) var ErrGlobalCheckpoint = errors.New("global checkpoint error") -type shard struct { - SeqNo struct { - GlobalCheckpoint int64 `json:"global_checkpoint"` - } `json:"seq_no"` -} +// Global checkpoint response +// {"global_checkpoints":[-1]} -type indexStats struct { - Shards map[string][]shard `json:"shards"` +type globalCheckpointsResponse struct { + GlobalCheckpoints []int64 `json:"global_checkpoints"` + TimedOut bool `json:"timed_out"` + Error esh.ErrorT `json:"error,omitempty"` } -type statsResponse struct { - IndexStats map[string]indexStats `json:"indices"` +func queryGlobalCheckpoint(ctx context.Context, es *elasticsearch.Client, index string) (seqno sqn.SeqNo, err error) { + req := esh.NewGlobalCheckpointsRequest(es.Transport) + res, err := req(req.WithContext(ctx), + req.WithIndex(index)) - Error es.ErrorT `json:"error,omitempty"` -} + if err != nil { + return + } + + seqno, err = processGlobalCheckpointResponse(res) + if errors.Is(err, esh.ErrIndexNotFound) { + seqno = sqn.DefaultSeqNo + err = nil + } -func queryGlobalCheckpoint(ctx context.Context, es *elasticsearch.Client, index string) (seqno int64, err error) { - seqno = defaultSeqNo + return seqno, err +} - res, err := es.Indices.Stats( - es.Indices.Stats.WithContext(ctx), - es.Indices.Stats.WithIndex(index), - es.Indices.Stats.WithLevel("shards"), +func waitCheckpointAdvance(ctx context.Context, es *elasticsearch.Client, index string, checkpoint sqn.SeqNo, to time.Duration) (seqno sqn.SeqNo, err error) { + req := esh.NewGlobalCheckpointsRequest(es.Transport) + res, err := req(req.WithContext(ctx), + req.WithIndex(index), + req.WithCheckpoints(checkpoint), + req.WithWaitForAdvance(true), + req.WithTimeout(to), ) if err != nil { return } + return processGlobalCheckpointResponse(res) +} + +func processGlobalCheckpointResponse(res *esapi.Response) (seqno sqn.SeqNo, err error) { defer res.Body.Close() - var sres statsResponse + // Don't parse the payload if timeout + if res.StatusCode == http.StatusGatewayTimeout { + return seqno, esh.ErrTimeout + } + + // Parse payload + var sres globalCheckpointsResponse err = json.NewDecoder(res.Body).Decode(&sres) if err != nil { return } - if len(sres.IndexStats) > 1 { - indices := make([]string, 0, len(sres.IndexStats)) - for k := range sres.IndexStats { - indices = append(indices, k) - } - return seqno, fmt.Errorf("more than one indices found %v, %w", indices, ErrGlobalCheckpoint) + // Check error + err = esh.TranslateError(res.StatusCode, sres.Error) + if err != nil { + return nil, err + } + + if sres.TimedOut { + return nil, esh.ErrTimeout } - if len(sres.IndexStats) > 0 { - // Grab the first and only index stats - var stats indexStats - for _, stats = range sres.IndexStats { - break - } - - if shards, ok := stats.Shards["0"]; ok { - if len(shards) > 0 { - seqno = shards[0].SeqNo.GlobalCheckpoint - } - } + if len(sres.GlobalCheckpoints) == 0 { + return nil, esh.ErrNotFound } - return + return sres.GlobalCheckpoints, nil } diff --git a/internal/pkg/monitor/mock/monitor.go b/internal/pkg/monitor/mock/monitor.go index 47368dcf7..25b268f07 100644 --- a/internal/pkg/monitor/mock/monitor.go +++ b/internal/pkg/monitor/mock/monitor.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" ) var gMockIndexCounter uint64 @@ -26,7 +27,7 @@ func (s *mockSubT) Output() <-chan []es.HitT { } type MockIndexMonitor struct { - checkpoint int64 + checkpoint sqn.SeqNo mut sync.RWMutex subs map[uint64]*mockSubT @@ -35,13 +36,13 @@ type MockIndexMonitor struct { // NewMockIndexMonitor returns a mock monitor. func NewMockIndexMonitor() *MockIndexMonitor { return &MockIndexMonitor{ - checkpoint: -1, + checkpoint: sqn.DefaultSeqNo, subs: make(map[uint64]*mockSubT), } } // GetCheckpoint returns the current checkpoint. -func (m *MockIndexMonitor) GetCheckpoint() int64 { +func (m *MockIndexMonitor) GetCheckpoint() sqn.SeqNo { return m.checkpoint } @@ -85,7 +86,7 @@ func (m *MockIndexMonitor) Notify(ctx context.Context, hits []es.HitT) { sz := len(hits) if sz > 0 { maxVal := hits[sz-1].SeqNo - m.checkpoint = maxVal + m.checkpoint = []int64{maxVal} m.mut.RLock() var wg sync.WaitGroup diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index 8d4812bf8..2dd8ac1f3 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -9,12 +9,14 @@ import ( "context" "encoding/json" "errors" - "sync/atomic" + "sync" "time" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/sleep" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog" @@ -22,7 +24,7 @@ import ( ) const ( - defaultCheckInterval = 1 * time.Second // check every second for the new action + defaultPollTimeout = 5 * time.Minute // default long poll timeout defaultSeqNo = int64(-1) // the _seq_no in elasticsearch start with 0 defaultWithExpiration = false @@ -34,7 +36,12 @@ const ( // One action can be split up into multiple documents up to the 1000 agents per action if needed. defaultFetchSize = 1000 - tightLoopCheckInterval = 10 * time.Millisecond // when we get a full page (fetchSize) of documents, use this interval to repeatedly poll for more records + // Retry delay on error waiting on the global checkpoint update. + // This is the wait time between requests to elastisearch in case if: + // 1. Index is not found (index is created only on the first document save) + // 2. Any other error waiting on global checkpoint, except timeouts. + // For the long poll timeout, start a new request as soon as possible. + retryDelay = 3 * time.Second ) const ( @@ -63,7 +70,7 @@ type HitsT struct { } type GlobalCheckpointProvider interface { - GetCheckpoint() int64 + GetCheckpoint() sqn.SeqNo } // SimpleMonitor monitors for new documents in an index @@ -83,16 +90,18 @@ type SimpleMonitor interface { // simpleMonitorT monitors for new documents in an index type simpleMonitorT struct { - cli *elasticsearch.Client + esCli *elasticsearch.Client + monCli *elasticsearch.Client tmplCheck *dsl.Tmpl tmplQuery *dsl.Tmpl index string - checkInterval time.Duration + pollTimeout time.Duration withExpiration bool fetchSize int - checkpoint int64 // index global checkpoint + checkpoint sqn.SeqNo // index global checkpoint + mx sync.RWMutex // checkpoint mutex log zerolog.Logger @@ -105,14 +114,16 @@ type simpleMonitorT struct { type Option func(SimpleMonitor) // New creates new simple monitor -func NewSimple(index string, cli *elasticsearch.Client, opts ...Option) (SimpleMonitor, error) { +func NewSimple(index string, esCli, monCli *elasticsearch.Client, opts ...Option) (SimpleMonitor, error) { + m := &simpleMonitorT{ index: index, - cli: cli, - checkInterval: defaultCheckInterval, + esCli: esCli, + monCli: monCli, + pollTimeout: defaultPollTimeout, withExpiration: defaultWithExpiration, fetchSize: defaultFetchSize, - checkpoint: defaultSeqNo, + checkpoint: sqn.DefaultSeqNo, outCh: make(chan []es.HitT, 1), } @@ -146,10 +157,10 @@ func WithFetchSize(fetchSize int) Option { } } -// WithCheckInterval sets a periodic check interval -func WithCheckInterval(interval time.Duration) Option { +// WithPollTimeout sets the global checkpoint polling timeout +func WithPollTimeout(to time.Duration) Option { return func(m SimpleMonitor) { - m.(*simpleMonitorT).checkInterval = interval + m.(*simpleMonitorT).pollTimeout = to } } @@ -173,17 +184,21 @@ func (m *simpleMonitorT) Output() <-chan []es.HitT { } // GetCheckpoint implements GlobalCheckpointProvider interface -func (m *simpleMonitorT) GetCheckpoint() int64 { +func (m *simpleMonitorT) GetCheckpoint() sqn.SeqNo { return m.loadCheckpoint() } -func (m *simpleMonitorT) storeCheckpoint(val int64) { - m.log.Debug().Int64("checkpoint", val).Msg("updated checkpoint") - atomic.StoreInt64(&m.checkpoint, val) +func (m *simpleMonitorT) storeCheckpoint(val sqn.SeqNo) { + m.log.Debug().Ints64("checkpoints", val).Msg("updated checkpoint") + m.mx.Lock() + defer m.mx.Unlock() + m.checkpoint = val.Clone() } -func (m *simpleMonitorT) loadCheckpoint() int64 { - return atomic.LoadInt64(&m.checkpoint) +func (m *simpleMonitorT) loadCheckpoint() sqn.SeqNo { + m.mx.RLock() + defer m.mx.RUnlock() + return m.checkpoint.Clone() } // Run runs monitor. @@ -200,10 +215,10 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) { }() // Initialize global checkpoint from the index stats - var checkpoint int64 - checkpoint, err = queryGlobalCheckpoint(ctx, m.cli, m.index) + var checkpoint sqn.SeqNo + checkpoint, err = queryGlobalCheckpoint(ctx, m.monCli, m.index) if err != nil { - m.log.Error().Err(err).Msg("failed to initialize the global checkpoint") + m.log.Error().Err(err).Msg("failed to initialize the global checkpoints") return err } m.storeCheckpoint(checkpoint) @@ -214,29 +229,40 @@ func (m *simpleMonitorT) Run(ctx context.Context) (err error) { m.readyCh = nil } - // Start timer loop to check for global checkpoint changes - t := time.NewTimer(m.checkInterval) - defer t.Stop() for { - select { - case <-t.C: - interval := m.checkInterval + checkpoint := m.loadCheckpoint() + + // Wait checkpoint advance + newCheckpoint, err := waitCheckpointAdvance(ctx, m.monCli, m.index, checkpoint, m.pollTimeout) + if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + // Wait until created + m.log.Info().Msgf("index not found, try again in %v", retryDelay) + } else if errors.Is(err, es.ErrTimeout) { + // Timed out, wait again + m.log.Debug().Msg("wait global checkpoints advance, timeout, wait again") + continue + } else { + // Log the error and keep trying + m.log.Error().Err(err).Msg("failed waiting global checkpoints advance") + } - hits, err := m.check(ctx) + // Delay next attempt + err = sleep.WithContext(ctx, retryDelay) if err != nil { - m.log.Error().Err(err).Msg("failed checking new documents") - } else { - count := m.notify(ctx, hits) + return err + } + } - // Change check interval if fetched the full page (m.fetchSize) of documents - if count == m.fetchSize { - m.log.Debug().Int("count", count).Dur("wait_next_check", interval).Msg("tight loop check") - interval = tightLoopCheckInterval - } + // Fetch up to known checkpoint + count := m.fetchSize + for count == m.fetchSize { + hits, err := m.fetch(ctx, newCheckpoint) + if err != nil { + m.log.Error().Err(err).Msg("failed checking new documents") + break } - t.Reset(interval) - case <-ctx.Done(): - return ctx.Err() + count = m.notify(ctx, hits) } } } @@ -247,7 +273,7 @@ func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int { select { case m.outCh <- hits: maxVal := hits[sz-1].SeqNo - m.storeCheckpoint(maxVal) + m.storeCheckpoint([]int64{maxVal}) return sz case <-ctx.Done(): } @@ -255,45 +281,21 @@ func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int { return 0 } -func (m *simpleMonitorT) check(ctx context.Context) ([]es.HitT, error) { +func (m *simpleMonitorT) fetch(ctx context.Context, maxCheckpoint sqn.SeqNo) ([]es.HitT, error) { now := time.Now().UTC().Format(time.RFC3339) checkpoint := m.loadCheckpoint() // Run check query that detects that there are new documents available params := map[string]interface{}{ - dl.FieldSeqNo: checkpoint, + dl.FieldSeqNo: checkpoint.Value(), + dl.FieldMaxSeqNo: maxCheckpoint.Value(), } if m.withExpiration { params[dl.FieldExpiration] = now } - hits, err := m.search(ctx, m.tmplCheck, params) - if err != nil { - return nil, err - } - - if len(hits) == 0 { - return nil, nil - } - - // New documents are detected, fetch global checkpoint - gcp, err := queryGlobalCheckpoint(ctx, m.cli, m.index) - if err != nil { - m.log.Error().Err(err).Msg("failed to check the global checkpoint") - return nil, err - } - - // If global check point is still not greater that the current known checkpoint, return nothing - if gcp <= checkpoint { - return nil, nil - } - - // Fetch documents capped by the global checkpoint - // Reusing params for the documents query - params[dl.FieldMaxSeqNo] = gcp - - hits, err = m.search(ctx, m.tmplQuery, params) + hits, err := m.search(ctx, m.tmplQuery, params) if err != nil { return nil, err } @@ -307,10 +309,10 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ return nil, err } - res, err := m.cli.Search( - m.cli.Search.WithContext(ctx), - m.cli.Search.WithIndex(m.index), - m.cli.Search.WithBody(bytes.NewBuffer(query)), + res, err := m.esCli.Search( + m.esCli.Search.WithContext(ctx), + m.esCli.Search.WithIndex(m.index), + m.esCli.Search.WithBody(bytes.NewBuffer(query)), ) if err != nil { return nil, err diff --git a/internal/pkg/monitor/monitor_integration_test.go b/internal/pkg/monitor/monitor_integration_test.go index eb1ebefb2..82cdd87cc 100644 --- a/internal/pkg/monitor/monitor_integration_test.go +++ b/internal/pkg/monitor/monitor_integration_test.go @@ -10,7 +10,6 @@ import ( "context" "sync" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -21,8 +20,6 @@ import ( ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" ) -const testMonitorIntervalMS = 100 - func setupIndex(ctx context.Context, t *testing.T) (string, bulk.Bulk) { index, bulker := ftesting.SetupIndexWithBulk(ctx, t, es.MappingAction) return index, bulker @@ -46,8 +43,7 @@ func TestSimpleMonitorNonEmptyIndex(t *testing.T) { func runSimpleMonitorTest(t *testing.T, ctx context.Context, index string, bulker bulk.Bulk) { readyCh := make(chan error) - mon, err := NewSimple(index, bulker.Client(), - WithCheckInterval(testMonitorIntervalMS*time.Millisecond), + mon, err := NewSimple(index, bulker.Client(), bulker.Client(), WithReadyChan(readyCh), ) require.NoError(t, err) diff --git a/internal/pkg/monitor/subscription_monitor.go b/internal/pkg/monitor/subscription_monitor.go index 06907d5ec..f0880cebe 100644 --- a/internal/pkg/monitor/subscription_monitor.go +++ b/internal/pkg/monitor/subscription_monitor.go @@ -6,11 +6,13 @@ package monitor import ( "context" - "github.com/elastic/fleet-server/v7/internal/pkg/es" "sync" "sync/atomic" "time" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + "github.com/elastic/go-elasticsearch/v8" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" @@ -60,8 +62,8 @@ type monitorT struct { } // New creates new subscription monitor -func New(index string, cli *elasticsearch.Client, opts ...Option) (Monitor, error) { - sm, err := NewSimple(index, cli, opts...) +func New(index string, esCli, monCli *elasticsearch.Client, opts ...Option) (Monitor, error) { + sm, err := NewSimple(index, esCli, monCli, opts...) if err != nil { return nil, err } @@ -75,7 +77,7 @@ func New(index string, cli *elasticsearch.Client, opts ...Option) (Monitor, erro return m, nil } -func (m *monitorT) GetCheckpoint() int64 { +func (m *monitorT) GetCheckpoint() sqn.SeqNo { return m.sm.GetCheckpoint() } diff --git a/internal/pkg/monitor/subscription_monitor_integration_test.go b/internal/pkg/monitor/subscription_monitor_integration_test.go index d24a894cf..22d226a33 100644 --- a/internal/pkg/monitor/subscription_monitor_integration_test.go +++ b/internal/pkg/monitor/subscription_monitor_integration_test.go @@ -10,7 +10,6 @@ import ( "context" "sync" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -38,8 +37,7 @@ func TestMonitorNonEmptyIndex(t *testing.T) { func runMonitorTest(t *testing.T, ctx context.Context, index string, bulker bulk.Bulk) { readyCh := make(chan error) - mon, err := New(index, bulker.Client(), - WithCheckInterval(testMonitorIntervalMS*time.Millisecond), + mon, err := New(index, bulker.Client(), bulker.Client(), WithReadyChan(readyCh), ) require.NoError(t, err) diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index 5983b8ab3..6bbb83b7f 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -22,8 +22,6 @@ import ( ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" ) -const testMonitorIntervalMS = 100 - func setupIndex(ctx context.Context, t *testing.T) (string, bulk.Bulk) { index, bulker := ftesting.SetupIndexWithBulk(ctx, t, es.MappingPolicy) return index, bulker @@ -34,7 +32,7 @@ func TestMonitor_Integration(t *testing.T) { defer cancel() index, bulker := setupIndex(ctx, t) - im, err := monitor.New(index, bulker.Client(), monitor.WithCheckInterval(testMonitorIntervalMS)) + im, err := monitor.New(index, bulker.Client(), bulker.Client()) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/policy/monitor_test.go b/internal/pkg/policy/monitor_test.go index b67bd45e4..7057aaf8d 100644 --- a/internal/pkg/policy/monitor_test.go +++ b/internal/pkg/policy/monitor_test.go @@ -160,6 +160,7 @@ func TestMonitor_SamePolicy(t *testing.T) { gotPolicy := false tm := time.NewTimer(1 * time.Second) + defer tm.Stop() select { case <-s.Output(): gotPolicy = true @@ -233,6 +234,7 @@ func TestMonitor_NewPolicyUncoordinated(t *testing.T) { gotPolicy := false tm := time.NewTimer(1 * time.Second) + defer tm.Stop() select { case <-s.Output(): gotPolicy = true diff --git a/internal/pkg/sqn/sqn.go b/internal/pkg/sqn/sqn.go index a2d2def60..d9832fe4f 100644 --- a/internal/pkg/sqn/sqn.go +++ b/internal/pkg/sqn/sqn.go @@ -11,6 +11,8 @@ import ( const UndefinedSeqNo = -1 +var DefaultSeqNo = []int64{UndefinedSeqNo} + // Abstracts the array of document seq numbers type SeqNo []int64 @@ -25,9 +27,20 @@ func (s SeqNo) IsSet() bool { return len(s) > 0 && s[0] >= 0 } -func (s SeqNo) Get(idx int) int64 { - if idx < len(s) { - return s[idx] +// Returns one/first value until we get and API to get the next checkpoints on search +func (s SeqNo) Value() int64 { + if len(s) == 0 { + return UndefinedSeqNo + } + return s[0] +} + +func (s SeqNo) Clone() SeqNo { + if s == nil { + return nil } - return UndefinedSeqNo + + r := make(SeqNo, len(s)) + copy(r, s) + return r } diff --git a/internal/pkg/testing/esutil/bootstrap.go b/internal/pkg/testing/esutil/bootstrap.go index 535f201d7..f8242971b 100644 --- a/internal/pkg/testing/esutil/bootstrap.go +++ b/internal/pkg/testing/esutil/bootstrap.go @@ -22,13 +22,7 @@ type indexConfig struct { var indexConfigs = map[string]indexConfig{ // Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again. // Will remove all the boostrapping code completely later once all is fully integrated - ".fleet-actions": {mapping: es.MappingAction}, - ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, - ".fleet-agents": {mapping: es.MappingAgent}, - ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, - ".fleet-policies": {mapping: es.MappingPolicy}, - ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, - ".fleet-servers": {mapping: es.MappingServer}, + ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, } // Bootstrap creates .fleet-actions data stream