From 647df28d5dfcdea95f98e4853d2c6106c2799bd2 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Fri, 17 May 2024 20:03:53 +0100 Subject: [PATCH] Event-handler: call `SearchUnstructuredEvents` with smaller windows This PR aims to reduce the windows sent from `SearchUnstructuredEvents`. StartTime is never updated so it's kept with its initial value which causes problems to the event handler sending windows that include more than 1Gb of data when using the Athena backend which causes failures. Syncs with https://github.com/gravitational/teleport-plugins/pull/1068 Related to https://github.com/gravitational/teleport/issues/41544 Signed-off-by: Tiago Silva --- integrations/event-handler/app.go | 16 +- integrations/event-handler/state.go | 37 ++- .../event-handler/teleport_events_watcher.go | 153 ++++++--- .../teleport_events_watcher_test.go | 300 ++++++++++++++---- 4 files changed, 391 insertions(+), 115 deletions(-) diff --git a/integrations/event-handler/app.go b/integrations/event-handler/app.go index 34ca3038287af..bcc6feb7bc36d 100644 --- a/integrations/event-handler/app.go +++ b/integrations/event-handler/app.go @@ -79,6 +79,9 @@ func (a *App) Run(ctx context.Context) error { a.SpawnCriticalJob(a.sessionEventsJob) <-a.Process.Done() + lastWindow := a.EventWatcher.getWindowStartTime() + a.State.SetLastWindowTime(&lastWindow) + return a.Err() } @@ -180,7 +183,18 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } - t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID) + lastWindowTime, err := s.GetLastWindowTime() + if err != nil { + return trace.Wrap(err) + } + // if lastWindowTime is nil, set it to startTime + // lastWindowTime is used to track the last window of events ingested + // and is updated on exit + if lastWindowTime == nil { + lastWindowTime = startTime + } + + t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID) if err != nil { return trace.Wrap(err) } diff --git a/integrations/event-handler/state.go b/integrations/event-handler/state.go index a5df1861f54c0..517e31be6cced 100644 --- a/integrations/event-handler/state.go +++ b/integrations/event-handler/state.go @@ -39,6 +39,9 @@ const ( // startTimeName is the start time variable name startTimeName = "start_time" + // windowTimeName is the start time of the last window. + windowTimeName = "window_time" + // cursorName is the cursor variable name cursorName = "cursor" @@ -121,38 +124,52 @@ func createStorageDir(c *StartCmdConfig) (string, error) { // GetStartTime gets current start time func (s *State) GetStartTime() (*time.Time, error) { - if !s.dv.Has(startTimeName) { + return s.getTimeKey(startTimeName) +} + +// SetStartTime sets current start time +func (s *State) SetStartTime(t *time.Time) error { + return s.setTimeKey(startTimeName, t) +} + +// GetLastWindowTime gets current start time +func (s *State) GetLastWindowTime() (*time.Time, error) { + return s.getTimeKey(windowTimeName) +} + +func (s *State) getTimeKey(keyName string) (*time.Time, error) { + if !s.dv.Has(keyName) { return nil, nil } - b, err := s.dv.Read(startTimeName) + b, err := s.dv.Read(keyName) if err != nil { return nil, trace.Wrap(err) } - // No previous start time exist if string(b) == "" { return nil, nil } - t, err := time.Parse(time.RFC3339, string(b)) if err != nil { return nil, trace.Wrap(err) } - t = t.Truncate(time.Second) - return &t, nil } -// SetStartTime sets current start time -func (s *State) SetStartTime(t *time.Time) error { +func (s *State) setTimeKey(keyName string, t *time.Time) error { if t == nil { - return s.dv.Write(startTimeName, []byte("")) + return s.dv.Write(keyName, []byte("")) } v := t.Truncate(time.Second).Format(time.RFC3339) - return s.dv.Write(startTimeName, []byte(v)) + return s.dv.Write(keyName, []byte(v)) +} + +// SetLastWindowTime sets current start time of the last window used. +func (s *State) SetLastWindowTime(t *time.Time) error { + return s.setTimeKey(windowTimeName, t) } // GetCursor gets current cursor value diff --git a/integrations/event-handler/teleport_events_watcher.go b/integrations/event-handler/teleport_events_watcher.go index c124bce9d39ed..fdff60bc22328 100644 --- a/integrations/event-handler/teleport_events_watcher.go +++ b/integrations/event-handler/teleport_events_watcher.go @@ -19,6 +19,7 @@ package main import ( "context" "fmt" + "sync" "time" "github.com/gravitational/trace" @@ -70,15 +71,17 @@ type TeleportEventsWatcher struct { batch []*TeleportEvent // config is teleport config config *StartCmdConfig - // startTime is event time frame start - startTime time.Time + + // windowStartTime is event time frame start + windowStartTime time.Time + windowStartTimeMu sync.Mutex } // NewTeleportEventsWatcher builds Teleport client instance func NewTeleportEventsWatcher( ctx context.Context, c *StartCmdConfig, - startTime time.Time, + windowStartTime time.Time, cursor string, id string, ) (*TeleportEventsWatcher, error) { @@ -119,12 +122,12 @@ func NewTeleportEventsWatcher( } tc := TeleportEventsWatcher{ - client: teleportClient, - pos: -1, - cursor: cursor, - config: c, - id: id, - startTime: startTime, + client: teleportClient, + pos: -1, + cursor: cursor, + config: c, + id: id, + windowStartTime: windowStartTime, } return &tc, nil @@ -151,43 +154,29 @@ func (t *TeleportEventsWatcher) flipPage() bool { // fetch fetches the page and sets the position to the event after latest known func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { log := logger.Get(ctx) - b, nextCursor, err := t.getEvents(ctx) + // Zero batch + t.batch = make([]*TeleportEvent, 0, t.config.BatchSize) + nextCursor, err := t.getEvents(ctx) if err != nil { return trace.Wrap(err) } - // Zero batch - t.batch = make([]*TeleportEvent, 0, len(b)) - // Save next cursor t.nextCursor = nextCursor // Mark position as unresolved (the page is empty) t.pos = -1 - log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(b)).Debug("Fetched page") + log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(t.batch)).Debug("Fetched page") // Page is empty: do nothing, return - if len(b) == 0 { + if len(t.batch) == 0 { + t.pos = 0 return nil } pos := 0 - // Convert batch to TeleportEvent - for _, e := range b { - if _, ok := t.config.SkipEventTypes[e.Type]; ok { - log.WithField("event", e).Debug("Skipping event") - continue - } - evt, err := NewTeleportEvent(e, t.cursor) - if err != nil { - return trace.Wrap(err) - } - - t.batch = append(t.batch, evt) - } - // If last known id is not empty, let's try to find it's pos if t.id != "" { for i, e := range t.batch { @@ -206,18 +195,98 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { return nil } -// getEvents calls Teleport client and loads events -func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) { - return t.client.SearchUnstructuredEvents( +// getEvents iterates over the range of days between the last windowStartTime and now. +// It returns a slice of events, a cursor for the next page and an error. +// If the cursor is out of the range, it advances the windowStartTime to the next day. +// It only advances the windowStartTime if no events are found until the last complete day. +func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) { + rangeSplitByDay := splitRangeByDay(t.getWindowStartTime(), time.Now().UTC()) + for i := 1; i < len(rangeSplitByDay); i++ { + startTime := rangeSplitByDay[i-1] + endTime := rangeSplitByDay[i] + log.Debugf("Fetching events from %v to %v", startTime, endTime) + evts, cursor, err := t.getEventsInWindow(ctx, startTime, endTime) + if err != nil { + return "", trace.Wrap(err) + } + + // Convert batch to TeleportEvent + for _, e := range evts { + if _, ok := t.config.SkipEventTypes[e.Type]; ok { + log.WithField("event", e).Debug("Skipping event") + continue + } + evt, err := NewTeleportEvent(e, t.cursor) + if err != nil { + return "", trace.Wrap(err) + } + + t.batch = append(t.batch, evt) + } + + // if no events are found, the cursor is out of the range [startTime, endTime] + // and it's the last complete day, update start time to the next day. + if t.canSkipToNextWindow(i, rangeSplitByDay, cursor) { + log.Infof("No new events found for the range %v to %v", startTime, endTime) + t.setWindowStartTime(endTime) + continue + } + // if any events are found, return them + return cursor, nil + } + return t.cursor, nil +} + +func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool { + if cursor != "" { + return false + + } + if len(t.batch) == 0 && i < len(rangeSplitByDay)-1 { + log.Infof("No events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i]) + return true + } + pos := 0 + // If last known id is not empty, let's try to find if all events are already processed + // and if we can skip to next page + if t.id != "" { + for i, e := range t.batch { + if e.ID == t.id { + pos = i + 1 + } + } + } + + if i < len(rangeSplitByDay)-1 && pos >= len(t.batch) { + log.WithField("pos", pos).WithField("len", len(t.batch)).Infof("No new events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i]) + return true + } + return false +} + +// getEvents calls Teleport client and loads events from the audit log. +// It returns a slice of events, a cursor for the next page and an error. +func (t *TeleportEventsWatcher) getEventsInWindow(ctx context.Context, from, to time.Time) ([]*auditlogpb.EventUnstructured, string, error) { + evts, cursor, err := t.client.SearchUnstructuredEvents( ctx, - t.startTime, - time.Now().UTC(), + from, + to, "default", t.config.Types, t.config.BatchSize, types.EventOrderAscending, t.cursor, ) + return evts, cursor, trace.Wrap(err) +} + +func splitRangeByDay(from, to time.Time) []time.Time { + // splitRangeByDay splits the range into days + var days []time.Time + for d := from; d.Before(to); d = d.AddDate(0, 0, 1) { + days = append(days, d) + } + return append(days, to) // add the last date } // pause sleeps for timeout seconds @@ -252,7 +321,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent } // If there is still nothing, sleep - if len(t.batch) == 0 { + if len(t.batch) == 0 && t.nextCursor == "" { if t.config.ExitOnLastEvent { log.Info("All events are processed, exiting...") break @@ -284,7 +353,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent // If there is still nothing new on current page, sleep if t.pos >= len(t.batch) { - if t.config.ExitOnLastEvent { + if t.config.ExitOnLastEvent && t.nextCursor == "" { log.Info("All events are processed, exiting...") break } @@ -346,3 +415,15 @@ func (t *TeleportEventsWatcher) UpsertLock(ctx context.Context, user string, log return t.client.UpsertLock(ctx, lock) } + +func (t *TeleportEventsWatcher) getWindowStartTime() time.Time { + t.windowStartTimeMu.Lock() + defer t.windowStartTimeMu.Unlock() + return t.windowStartTime +} + +func (t *TeleportEventsWatcher) setWindowStartTime(time time.Time) { + t.windowStartTimeMu.Lock() + defer t.windowStartTimeMu.Unlock() + t.windowStartTime = time +} diff --git a/integrations/event-handler/teleport_events_watcher_test.go b/integrations/event-handler/teleport_events_watcher_test.go index 99b01318f6749..601dc30db1b2a 100644 --- a/integrations/event-handler/teleport_events_watcher_test.go +++ b/integrations/event-handler/teleport_events_watcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "strconv" "sync" "testing" @@ -24,12 +25,12 @@ import ( "github.com/gravitational/trace" "github.com/stretchr/testify/require" - "golang.org/x/net/context" "github.com/gravitational/teleport/api/client/proto" auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" + libevents "github.com/gravitational/teleport/lib/events" ) // mockTeleportEventWatcher is Teleport client mock @@ -73,8 +74,19 @@ func (c *mockTeleportEventWatcher) SearchEvents(ctx context.Context, fromUTC, to endIndex = len(c.events) } - // Get the next page - e := c.events[startIndex:endIndex] + // validate time + var e []events.AuditEvent + for i, event := range c.events { + if i < startIndex { + continue + } + if i >= endIndex { + break + } + if event.GetTime().After(fromUTC) && event.GetTime().Before(toUTC) { + e = append(e, event) + } + } // Check if we finished the page var lastKey string @@ -121,16 +133,23 @@ func (c *mockTeleportEventWatcher) Close() error { return nil } -func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher { +func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient, startTime time.Time, skipEventTypesRaw []string) *TeleportEventsWatcher { + skipEventTypes := map[string]struct{}{} + for _, eventType := range skipEventTypesRaw { + skipEventTypes[eventType] = struct{}{} + } client := &TeleportEventsWatcher{ client: eventsClient, pos: -1, config: &StartCmdConfig{ IngestConfig: IngestConfig{ - BatchSize: 5, - ExitOnLastEvent: true, + BatchSize: 5, + ExitOnLastEvent: true, + SkipEventTypes: skipEventTypes, + SkipSessionTypesRaw: skipEventTypesRaw, }, }, + windowStartTime: startTime, } return client @@ -144,7 +163,9 @@ func TestEvents(t *testing.T) { for i := 0; i < 20; i++ { testAuditEvents[i] = &events.UserCreate{ Metadata: events.Metadata{ - ID: strconv.Itoa(i), + ID: strconv.Itoa(i), + Time: time.Now(), + Type: libevents.UserUpdatedEvent, }, } } @@ -153,7 +174,7 @@ func TestEvents(t *testing.T) { // Add the 20 events to a mock event watcher. mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents} - client := newTeleportEventWatcher(t, mockEventWatcher) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), nil) // Start the events goroutine chEvt, chErr := client.Events(ctx) @@ -170,7 +191,7 @@ func TestEvents(t *testing.T) { case err := <-chErr: t.Fatalf("Received unexpected error from error channel: %v", err) return - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -179,49 +200,15 @@ func TestEvents(t *testing.T) { select { case _, ok := <-chEvt: require.False(t, ok, "Events channel should be closed") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } -} - -func TestEventsNoExit(t *testing.T) { - ctx := context.Background() - - // create fake audit events with ids 0-19 - testAuditEvents := make([]events.AuditEvent, 20) - for i := 0; i < 20; i++ { - testAuditEvents[i] = &events.UserCreate{ - Metadata: events.Metadata{ - ID: strconv.Itoa(i), - }, - } - } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - // Add the 20 events to a mock event watcher. - mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents} - client := newTeleportEventWatcher(t, mockEventWatcher) - client.config.ExitOnLastEvent = false - - // Start the events goroutine - chEvt, chErr := client.Events(ctx) - - // Collect all 20 events - for i := 0; i < 20; i++ { - select { - case event, ok := <-chEvt: - require.NotNil(t, event, "Expected an event but got nil. i: %v", i) - require.Equal(t, strconv.Itoa(i), event.ID) - if !ok { - return - } - case err := <-chErr: - t.Fatalf("Received unexpected error from error channel: %v", err) - return - case <-time.After(100 * time.Millisecond): - t.Fatalf("No events received within deadline") - } + select { + case _, ok := <-chErr: + require.False(t, ok, "Error channel should be closed") + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") } // Events goroutine should return next page errors @@ -229,10 +216,9 @@ func TestEventsNoExit(t *testing.T) { mockEventWatcher.setSearchEventsError(mockErr) select { - case err, ok := <-chErr: - require.True(t, ok, "Channel unexpectedly close") - require.ErrorIs(t, err, mockErr) - case <-time.After(100 * time.Millisecond): + case err := <-chErr: + require.Error(t, mockErr, err) + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } @@ -240,14 +226,14 @@ func TestEventsNoExit(t *testing.T) { select { case _, ok := <-chEvt: require.False(t, ok, "Events channel should be closed") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } select { case _, ok := <-chErr: require.False(t, ok, "Error channel should be closed") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -260,7 +246,9 @@ func TestUpdatePage(t *testing.T) { for i := 0; i < 10; i++ { testAuditEvents[i] = &events.UserCreate{ Metadata: events.Metadata{ - ID: strconv.Itoa(i), + ID: strconv.Itoa(i), + Time: time.Now(), + Type: libevents.UserUpdatedEvent, }, } } @@ -268,7 +256,7 @@ func TestUpdatePage(t *testing.T) { defer cancel() mockEventWatcher := &mockTeleportEventWatcher{} - client := newTeleportEventWatcher(t, mockEventWatcher) + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-1*time.Hour), nil) client.config.ExitOnLastEvent = false // Start the events goroutine @@ -288,7 +276,7 @@ func TestUpdatePage(t *testing.T) { case err := <-chErr: t.Fatalf("Received unexpected error from error channel: %v", err) return - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -299,7 +287,7 @@ func TestUpdatePage(t *testing.T) { t.Fatalf("Events channel should be open") case <-chErr: t.Fatalf("Events channel should be open") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): } // Update the event watcher with the full page of events an collect. @@ -315,7 +303,7 @@ func TestUpdatePage(t *testing.T) { case err := <-chErr: t.Fatalf("Received unexpected error from error channel: %v", err) return - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -326,7 +314,7 @@ func TestUpdatePage(t *testing.T) { t.Fatalf("Events channel should be open") case <-chErr: t.Fatalf("Events channel should be open") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): } // Add another partial page and collect the events @@ -342,7 +330,7 @@ func TestUpdatePage(t *testing.T) { case err := <-chErr: t.Fatalf("Received unexpected error from error channel: %v", err) return - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -352,10 +340,9 @@ func TestUpdatePage(t *testing.T) { mockEventWatcher.setSearchEventsError(mockErr) select { - case err, ok := <-chErr: - require.True(t, ok, "Channel unexpectedly close") - require.ErrorIs(t, err, mockErr) - case <-time.After(100 * time.Millisecond): + case err := <-chErr: + require.Error(t, mockErr, err) + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } @@ -363,14 +350,14 @@ func TestUpdatePage(t *testing.T) { select { case _, ok := <-chEvt: require.False(t, ok, "Events channel should be closed") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } select { case _, ok := <-chErr: require.False(t, ok, "Error channel should be closed") - case <-time.After(100 * time.Millisecond): + case <-time.After(2 * time.Second): t.Fatalf("No events received within deadline") } } @@ -451,3 +438,180 @@ func TestValidateConfig(t *testing.T) { }) } } + +func Test_splitRangeByDay(t *testing.T) { + type args struct { + from time.Time + to time.Time + } + tests := []struct { + name string + args args + want []time.Time + }{ + { + name: "Same day", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "Two days", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC), + }, + }, + { + name: "week", + args: args{ + from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + want: []time.Time{ + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 6, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 0, 0, 0, 0, time.UTC), + time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := splitRangeByDay(tt.args.from, tt.args.to) + require.Equal(t, tt.want, got) + }) + } +} + +func TestEventsWithWindowSkip(t *testing.T) { + ctx := context.Background() + + // create fake audit events with ids 0-29 + testAuditEvents := make([]events.AuditEvent, 30) + for i := 0; i < 10; i++ { + testAuditEvents[i] = &events.UserCreate{ + Metadata: events.Metadata{ + ID: strconv.Itoa(i), + Time: time.Now(), + Type: libevents.UserUpdatedEvent, + }, + } + } + for i := 10; i < 20; i++ { + testAuditEvents[i] = &events.UserCreate{ + Metadata: events.Metadata{ + ID: strconv.Itoa(i), + Time: time.Now(), + Type: libevents.UserCreateEvent, + }, + } + } + + for i := 20; i < 30; i++ { + testAuditEvents[i] = &events.UserCreate{ + Metadata: events.Metadata{ + ID: strconv.Itoa(i), + Time: time.Now(), + Type: libevents.UserUpdatedEvent, + }, + } + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Add the 20 events to a mock event watcher. + mockEventWatcher := &mockTeleportEventWatcher{events: testAuditEvents} + client := newTeleportEventWatcher(t, mockEventWatcher, time.Now().Add(-48*time.Hour), []string{libevents.UserCreateEvent}) + + // Start the events goroutine + chEvt, chErr := client.Events(ctx) + + // Collect all 10 first events + for i := 0; i < 10; i++ { + select { + case event, ok := <-chEvt: + require.NotNil(t, event, "Expected an event but got nil. i: %v", i) + require.Equal(t, strconv.Itoa(i), event.ID) + if !ok { + return + } + case err := <-chErr: + t.Fatalf("Received unexpected error from error channel: %v", err) + return + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + } + + for i := 20; i < 30; i++ { + select { + case event, ok := <-chEvt: + require.NotNil(t, event, "Expected an event but got nil. i: %v", i) + require.Equal(t, strconv.Itoa(i), event.ID) + if !ok { + return + } + case err := <-chErr: + t.Fatalf("Received unexpected error from error channel: %v", err) + return + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + } + + // Both channels should be closed once the last event is reached. + select { + case _, ok := <-chEvt: + require.False(t, ok, "Events channel should be closed") + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + + select { + case _, ok := <-chErr: + require.False(t, ok, "Error channel should be closed") + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + + // Events goroutine should return next page errors + mockErr := trace.Errorf("error") + mockEventWatcher.setSearchEventsError(mockErr) + + select { + case err := <-chErr: + require.Error(t, mockErr, err) + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + + // Both channels should be closed + select { + case _, ok := <-chEvt: + require.False(t, ok, "Events channel should be closed") + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } + + select { + case _, ok := <-chErr: + require.False(t, ok, "Error channel should be closed") + case <-time.After(2 * time.Second): + t.Fatalf("No events received within deadline") + } +}