From 5badd10e2c02eb3bcbd9a6a7abeba6aa3889ac14 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Mon, 13 May 2024 15:31:34 +0100 Subject: [PATCH] Event-handler: call `SearchUnstructuredEvents` with smaller windows This PR aims to reduce the windows sent from `SearchUnstructuredEvents`. StartTime is never update so it's kept with its initial value which causes problems to the event handler sending windows that include more than 1Gb of data when using the Athena backend which causes failures. Signed-off-by: Tiago Silva --- event-handler/teleport_events_watcher.go | 51 +++++++++++--- event-handler/teleport_events_watcher_test.go | 69 ++++++++++++++++++- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/event-handler/teleport_events_watcher.go b/event-handler/teleport_events_watcher.go index f715f3191..f84ac5f4b 100644 --- a/event-handler/teleport_events_watcher.go +++ b/event-handler/teleport_events_watcher.go @@ -71,6 +71,7 @@ type TeleportEventsWatcher struct { config *StartCmdConfig // startTime is event time frame start startTime time.Time + state *State } // NewTeleportEventsWatcher builds Teleport client instance @@ -80,6 +81,7 @@ func NewTeleportEventsWatcher( startTime time.Time, cursor string, id string, + state *State, ) (*TeleportEventsWatcher, error) { var creds []client.Credentials switch { @@ -207,16 +209,45 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { // getEvents calls Teleport client and loads events func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) { - return t.client.SearchUnstructuredEvents( - ctx, - t.startTime, - time.Now().UTC(), - "default", - t.config.Types, - t.config.BatchSize, - types.EventOrderAscending, - t.cursor, - ) + rangeSplitByDay := splitRangeByDay(t.startTime, time.Now().UTC()) + for i := 1; i < len(rangeSplitByDay); i++ { + startTime := rangeSplitByDay[i-1] + endTime := rangeSplitByDay[i] + log.Debugf("Fetching events from %v to %v", startTime, endTime) + evts, cursor, err := t.client.SearchUnstructuredEvents( + ctx, + startTime, + endTime, + "default", + t.config.Types, + t.config.BatchSize, + types.EventOrderAscending, + t.cursor, + ) + if err != nil { + return nil, "", trace.Wrap(err) + } + + // if no events are found, the cursor is out of the range [startTime, endTime] + // and it's the last complete day, update start time to the next day. + if len(evts) == 0 && i < len(rangeSplitByDay)-2 { + log.Infof("No events found for the range %v to %v", startTime, endTime) + t.startTime = endTime + continue + } + // if any events are found, return them + return evts, cursor, nil + } + return nil, t.cursor, nil +} + +func splitRangeByDay(from, to time.Time) []time.Time { + // splitRangeByDay splits the range into days + var days []time.Time + for d := from; d.Before(to); d = d.AddDate(0, 0, 1) { + days = append(days, d) + } + return append(days, to) // add the last date } // pause sleeps for timeout seconds diff --git a/event-handler/teleport_events_watcher_test.go b/event-handler/teleport_events_watcher_test.go index b3242ac7c..846febf2b 100644 --- a/event-handler/teleport_events_watcher_test.go +++ b/event-handler/teleport_events_watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "strconv" "sync" "testing" @@ -27,8 +28,8 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/trace" + "github.com/peterbourgon/diskv/v3" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) // mockTeleportEventWatcher is Teleport client mock @@ -121,6 +122,12 @@ func (c *mockTeleportEventWatcher) Close() error { } func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher { + dv := diskv.New(diskv.Options{ + BasePath: t.TempDir(), + Transform: func(s string) []string { return []string{} }, + CacheSizeMax: cacheSizeMaxBytes, + }) + client := &TeleportEventsWatcher{ client: eventsClient, pos: -1, @@ -130,6 +137,7 @@ func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClie ExitOnLastEvent: true, }, }, + state: &State{dv}, } return client @@ -414,3 +422,62 @@ func TestValidateConfig(t *testing.T) { }) } } + +func Test_splitRangeByDay(t *testing.T) { + type args struct { + from time.Time + to time.Time + } + tests := []struct { + name string + args args + want []time.Time + }{ + { + name: "Same day", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "Two days", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "week", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 6, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitRangeByDay(tt.args.from, tt.args.to) + require.Equal(t, tt.want, got) + }) + } +}