Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
advance to next page with skip support
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed May 14, 2024
1 parent bbaff20 commit b043444
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,43 +153,28 @@ func (t *TeleportEventsWatcher) flipPage() bool {
// fetch fetches the page and sets the position to the event after latest known
func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {
log := logger.Get(ctx)
b, nextCursor, err := t.getEvents(ctx)
// Zero batch
t.batch = make([]*TeleportEvent, 0, t.config.BatchSize)
nextCursor, err := t.getEvents(ctx)
if err != nil {
return trace.Wrap(err)
}

// Zero batch
t.batch = make([]*TeleportEvent, 0, len(b))

// Save next cursor
t.nextCursor = nextCursor

// Mark position as unresolved (the page is empty)
t.pos = -1

log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(b)).Debug("Fetched page")
log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(t.batch)).Debug("Fetched page")

// Page is empty: do nothing, return
if len(b) == 0 {
if len(t.batch) == 0 {
return nil
}

pos := 0

// Convert batch to TeleportEvent
for _, e := range b {
if _, ok := t.config.SkipEventTypes[e.Type]; ok {
log.WithField("event", e).Debug("Skipping event")
continue
}
evt, err := NewTeleportEvent(e, t.cursor)
if err != nil {
return trace.Wrap(err)
}

t.batch = append(t.batch, evt)
}

// If last known id is not empty, let's try to find it's pos
if t.id != "" {
for i, e := range t.batch {
Expand All @@ -212,28 +197,65 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {
// It returns a slice of events, a cursor for the next page and an error.
// If the cursor is out of the range, it advances the windowStartTime to the next day.
// It only advances the windowStartTime if no events are found until the last complete day.
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) {
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) {
rangeSplitByDay := splitRangeByDay(t.getWindowStartTime(), time.Now().UTC())
for i := 1; i < len(rangeSplitByDay); i++ {
startTime := rangeSplitByDay[i-1]
endTime := rangeSplitByDay[i]
log.Debugf("Fetching events from %v to %v", startTime, endTime)
evts, cursor, err := t.getEventsInWindow(ctx, startTime, endTime)
if err != nil {
return nil, "", trace.Wrap(err)
return "", trace.Wrap(err)
}

// Convert batch to TeleportEvent
for _, e := range evts {
if _, ok := t.config.SkipEventTypes[e.Type]; ok {
log.WithField("event", e).Debug("Skipping event")
continue
}
evt, err := NewTeleportEvent(e, t.cursor)
if err != nil {
return "", trace.Wrap(err)
}

t.batch = append(t.batch, evt)
}

// if no events are found, the cursor is out of the range [startTime, endTime]
// and it's the last complete day, update start time to the next day.
if len(evts) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", startTime, endTime)
if t.canSkipToNextWindow(i, rangeSplitByDay, cursor) {
log.Infof("No new events found for the range %v to %v", startTime, endTime)
t.setWindowStartTime(endTime)
continue
}
// if any events are found, return them
return evts, cursor, nil
return cursor, nil
}
return t.cursor, nil
}

func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool {
if len(t.batch) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
}
pos := 0
// If last known id is not empty, let's try to find if all events are already processed
// and if we can skip to next page
if t.id != "" {
for i, e := range t.batch {
if e.ID == t.id {
pos = i + 1
}
}
}

if cursor == "" && i < len(rangeSplitByDay)-1 && pos >= len(t.batch) {
log.WithField("pos", pos).WithField("len", len(t.batch)).Infof("No new events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
}
return nil, t.cursor, nil
return false
}

// getEvents calls Teleport client and loads events from the audit log.
Expand Down

0 comments on commit b043444

Please sign in to comment.