Skip to content

Commit

Permalink
Improve expired actions cleanup, use _delete_by_query instead (#906)
Browse files Browse the repository at this point in the history
* Improve expired actions cleanup, use _delete_by_query instead

(cherry picked from commit fae23a3)
  • Loading branch information
aleksmaus authored and mergify-bot committed Nov 23, 2021
1 parent 99dec47 commit b56a258
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *

// Run schduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupBeforeInteval))
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
return fmt.Errorf("failed to create elasticsearch GC: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/config/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ package config
import "time"

const (
defaultScheduleInterval = time.Hour
defaultCleanupBeforeInterval = 30 * 24 * time.Hour // cleanup expired actions with expiration time older than 30 days from now
defaultScheduleInterval = time.Hour
defaultCleanupIntervalAfterExpired = "30d" // cleanup expired actions with expiration time older than 30 days from now
)

// GC is the configuration for the Fleet Server data garbage collection.
// Currently manages the expired actions cleanup
type GC struct {
ScheduleInterval time.Duration `config:"schedule_interval"`
CleanupBeforeInteval time.Duration `config:"cleanup_before_interval"`
ScheduleInterval time.Duration `config:"schedule_interval"`
CleanupAfterExpiredInterval string `config:"cleanup_after_expired_interval"`
}

func (g *GC) InitDefaults() {
g.ScheduleInterval = defaultScheduleInterval
g.CleanupBeforeInteval = defaultCleanupBeforeInterval
g.CleanupAfterExpiredInterval = defaultCleanupIntervalAfterExpired
}
55 changes: 51 additions & 4 deletions internal/pkg/dl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package dl

import (
"bytes"
"context"
"encoding/json"
"errors"
"time"

Expand All @@ -31,7 +33,8 @@ var (
QueryAgentActions = prepareFindAgentActions()

// Query for expired actions GC
QueryExpiredActions = prepareFindExpiredAction()
QueryDeleteExpiredActions = prepareDeleteExpiredAction()
QueryFindExpiredActions = prepareFindExpiredAction()
)

func prepareFindAllAgentsActions() *dsl.Tmpl {
Expand All @@ -50,6 +53,15 @@ func prepareFindAction() *dsl.Tmpl {
return tmpl
}

func prepareDeleteExpiredAction() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
filter := root.Query().Bool().Filter()
filter.Range(FieldExpiration, dsl.WithRangeLTE(tmpl.Bind(FieldExpiration)))
tmpl.MustResolve(root)
return tmpl
}

func prepareFindExpiredAction() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
Expand Down Expand Up @@ -114,8 +126,43 @@ func FindAgentActions(ctx context.Context, bulker bulk.Bulk, minSeqNo, maxSeqNo
return hitsToActions(res.Hits)
}

func FindExpiredAtionsHits(ctx context.Context, bulker bulk.Bulk, expiredBefore time.Time, size int) ([]es.HitT, error) {
return FindExpiredActionsHitsForIndex(ctx, FleetActions, bulker, expiredBefore, size)
func DeleteExpiredForIndex(ctx context.Context, index string, bulker bulk.Bulk, cleanupIntervalAfterExpired string) (count int64, err error) {
params := map[string]interface{}{
FieldExpiration: "now-" + cleanupIntervalAfterExpired,
}

query, err := QueryDeleteExpiredActions.Render(params)
if err != nil {
return
}

res, err := bulker.Client().API.DeleteByQuery([]string{index}, bytes.NewReader(query),
bulker.Client().API.DeleteByQuery.WithContext(ctx))

if err != nil {
return
}

defer res.Body.Close()
var esres es.DeleteByQueryResponse

err = json.NewDecoder(res.Body).Decode(&esres)
if err != nil {
return
}

if res.IsError() {
err = es.TranslateError(res.StatusCode, &esres.Error)
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
log.Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error())
err = nil
}
return
}
}

return esres.Deleted, nil
}

func FindExpiredActionsHitsForIndex(ctx context.Context, index string, bulker bulk.Bulk, expiredBefore time.Time, size int) ([]es.HitT, error) {
Expand All @@ -124,7 +171,7 @@ func FindExpiredActionsHitsForIndex(ctx context.Context, index string, bulker bu
FieldSize: size,
}

res, err := findActionsHits(ctx, bulker, QueryExpiredActions, index, params, nil)
res, err := findActionsHits(ctx, bulker, QueryFindExpiredActions, index, params, nil)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/es/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package es

import (
"encoding/json"

"github.com/elastic/fleet-server/v7/internal/pkg/model"
)

Expand Down Expand Up @@ -128,6 +129,15 @@ type Response struct {
Error ErrorT `json:"error,omitempty"`
}

type DeleteByQueryResponse struct {
Status int `json:"status"`
Took uint64 `json:"took"`
TimedOut bool `json:"timed_out"`
Deleted int64 `json:"deleted"`

Error ErrorT `json:"error,omitempty"`
}

type ResultT struct {
HitsT
Aggregations map[string]Aggregation
Expand Down
139 changes: 37 additions & 102 deletions internal/pkg/gc/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,144 +6,79 @@ package gc

import (
"context"
"math/rand"
"net/http"
"time"
"strconv"
"strings"

"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/scheduler"
"github.com/elastic/fleet-server/v7/internal/pkg/wait"
)

const (
defaultMaxWaitInCleanupLoop = 10 * time.Second // the wait in the cleanup loop between iteration is random
defaultActionsSelectSize = 1000
defaultActionsCleanupBeforeInterval = 24 * 30 * time.Hour
)

type ActionsCleanupConfig struct {
maxWaitInCleanupLoop time.Duration
actionsSelectSize int
cleanupBeforeInterval time.Duration
cleanupIntervalAfterExpired string
}

type ActionsCleanupOpt func(c *ActionsCleanupConfig)

func WithMaxWaitInCleanupLoop(maxWaitInCleanupLoop time.Duration) ActionsCleanupOpt {
return func(c *ActionsCleanupConfig) {
c.maxWaitInCleanupLoop = maxWaitInCleanupLoop
// isIntervalStringValid validated interval string according to the elasticsearch documentation
// https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
func isIntervalStringValid(interval string) bool {
if len(interval) < 2 {
return false
}
if strings.HasPrefix(interval, "-") {
return false
}
}

func WithActionSelectSize(actionsSelectSize int) ActionsCleanupOpt {
return func(c *ActionsCleanupConfig) {
c.actionsSelectSize = actionsSelectSize
num := interval[0 : len(interval)-1]
suffix := interval[len(num):]

switch suffix {
case "y", "M", "w", "d", "h", "H", "m", "s":
if _, err := strconv.Atoi(num); err == nil {
return true
}
}

return false
}

func WithActionCleanupBeforeInterval(cleanupBeforeInterval time.Duration) ActionsCleanupOpt {
func WithCleanupIntervalAfterExpired(cleanupIntervalAfterExpired string) ActionsCleanupOpt {
return func(c *ActionsCleanupConfig) {
c.cleanupBeforeInterval = cleanupBeforeInterval
// Use the interval if valid, otherwise keep the default
if isIntervalStringValid(cleanupIntervalAfterExpired) {
c.cleanupIntervalAfterExpired = cleanupIntervalAfterExpired
}
}
}

func getActionsGCFunc(bulker bulk.Bulk, cleanupBeforeInterval time.Duration) scheduler.WorkFunc {
func getActionsGCFunc(bulker bulk.Bulk, cleanupIntervalAfterExpired string) scheduler.WorkFunc {
return func(ctx context.Context) error {
return cleanupActions(ctx, dl.FleetActions, bulker,
WithActionCleanupBeforeInterval(cleanupBeforeInterval))
WithCleanupIntervalAfterExpired(cleanupIntervalAfterExpired))
}
}

func cleanupActions(ctx context.Context, index string, bulker bulk.Bulk, opts ...ActionsCleanupOpt) error {
log := log.With().Str("ctx", "fleet actions cleanup").Logger()

c := ActionsCleanupConfig{
cleanupBeforeInterval: defaultActionsCleanupBeforeInterval,
actionsSelectSize: defaultActionsSelectSize,
maxWaitInCleanupLoop: defaultMaxWaitInCleanupLoop,
cleanupIntervalAfterExpired: defaultCleanupIntervalAfterExpired,
}

for _, opt := range opts {
opt(&c)
}

// Cleanup expired actions where expired timetamp is older than current time minus cleanupBeforeInterval
// Example: cleanup up actions that expired more than two weeks ago
expiredBefore := time.Now().Add(-c.cleanupBeforeInterval)
log := log.With().Str("ctx", "fleet actions cleanup").Str("interval", "now-"+c.cleanupIntervalAfterExpired).Logger()

// Random generator for calculating random pause duration in the cleanup loop
r := rand.New(rand.NewSource(time.Now().Unix()))
log.Debug().Msg("delete expired actions")

var (
hits []es.HitT
err error
)

for {
log.Debug().Str("expired_before", expiredBefore.UTC().Format(time.RFC3339)).Msgf("find actions that expired before given date/time")
hits, err = dl.FindExpiredActionsHitsForIndex(ctx, index, bulker, expiredBefore, c.actionsSelectSize)
if err != nil {
return err
}

if len(hits) == 0 {
log.Debug().Msg("no more expired actions found, done cleaning")
return nil
}

log.Debug().Int("count", len(hits)).Msg("delete expired actions")
if len(hits) > 0 {
ops := make([]bulk.MultiOp, len(hits))
for i := 0; i < len(hits); i++ {
ops[i] = bulk.MultiOp{Index: index, Id: hits[i].Id}
}

res, err := bulker.MDelete(ctx, ops)
if err != nil {
// The error is logged
log.Debug().Err(err).Msg("failed to delete actions")
if eserr, ok := err.(*es.ErrElastic); ok {
if eserr.Status == http.StatusNotFound {
err = nil
}
}
if err != nil {
return err
}
}
for i, r := range res {
if r.Error != nil {
err = es.TranslateError(r.Status, r.Error)
if err != nil {
log.Debug().Err(err).Str("action_id", hits[i].Id).Msg("failed to delete action")
if r.Status != http.StatusNotFound {
return err
}
}
}
}
}

// If number of records selected is less than max, can exit the cleanup loop
if len(hits) < c.actionsSelectSize {
return nil
}

// The full number of hits was returned
// Can potentially have more records
// Pause before doing another iteration
if c.maxWaitInCleanupLoop > 0 {
pauseDuration := time.Duration(r.Int63n(int64(c.maxWaitInCleanupLoop)))
log.Debug().Dur("pause_duration", pauseDuration).Msg("more actions could be avaiable, pause before the next cleanup cycle")
// Wait with context some random short interval to avoid tight loops
err = wait.WithContext(ctx, pauseDuration)
if err != nil {
return err
}
}
deleted, err := dl.DeleteExpiredForIndex(ctx, index, bulker, c.cleanupIntervalAfterExpired)
if err != nil {
log.Debug().Err(err).Msg("failed to delete actions")
return err
}
log.Debug().Int64("count", deleted).Msg("deleted expired actions")
return nil
}
12 changes: 5 additions & 7 deletions internal/pkg/gc/actions_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestCleanupActions(t *testing.T) {

func testCleanupActionsWithSelectSize(t *testing.T, skipIndexInitialization bool, selectSize int) {
const (
thirtyDays = 24 * 30 * time.Hour
thirtyDaysAndHour = thirtyDays + time.Hour
thirtyDays = "720h"
thirtyDaysAndHour = "721h"
)
var (
index string
Expand All @@ -72,7 +72,7 @@ func testCleanupActionsWithSelectSize(t *testing.T, skipIndexInitialization bool
ftesting.CreateActionsWithMaxAgentsCount(7),
ftesting.CreateActionsWithMinActionsCount(7),
ftesting.CreateActionsWithMaxActionsCount(15),
ftesting.CreateActionsWithTimestampOffset(-thirtyDaysAndHour),
ftesting.CreateActionsWithTimestampOffset(-((24*30)+1)*time.Hour),
)
if err != nil {
t.Fatal(err)
Expand All @@ -99,17 +99,15 @@ func testCleanupActionsWithSelectSize(t *testing.T, skipIndexInitialization bool
}

err = cleanupActions(ctx, index, bulker,
WithActionCleanupBeforeInterval(thirtyDays),
WithActionSelectSize(selectSize), // smaller size test few loop passes
WithMaxWaitInCleanupLoop(0))
WithCleanupIntervalAfterExpired(thirtyDays))
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Second)

// Check that all expired actions where deleted
hits, err := dl.FindExpiredActionsHitsForIndex(ctx, index, bulker, time.Now().Add(-thirtyDays), 100)
hits, err := dl.FindExpiredActionsHitsForIndex(ctx, index, bulker, time.Now().Add(-24*30*time.Hour), 100)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit b56a258

Please sign in to comment.