Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
Event-handler: call SearchUnstructuredEvents with smaller windows
Browse files Browse the repository at this point in the history
This PR aims to reduce the windows sent from `SearchUnstructuredEvents`.
StartTime is never update so it's kept with its initial value which
causes problems to the event handler sending windows that include more
than 1Gb of data when using the Athena backend which causes failures.

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato committed May 13, 2024
1 parent 819f62a commit 37db668
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 41 deletions.
16 changes: 15 additions & 1 deletion event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (a *App) Run(ctx context.Context) error {
a.SpawnCriticalJob(a.sessionEventsJob)
<-a.Process.Done()

lastWindow := a.EventWatcher.getWindowStartTime()
a.State.SetLastWindowTime(&lastWindow)

return a.Err()
}

Expand Down Expand Up @@ -179,7 +182,18 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID)
lastWindowTime, err := s.GetLastWindowTime()
if err != nil {
return trace.Wrap(err)
}
// if lastWindowTime is nil, set it to startTime
// lastWindowTime is used to track the last window of events ingested
// and is updated on exit
if lastWindowTime == nil {
lastWindowTime = startTime
}

t, err := NewTeleportEventsWatcher(ctx, a.Config, *lastWindowTime, latestCursor, latestID)
if err != nil {
return trace.Wrap(err)
}
Expand Down
33 changes: 27 additions & 6 deletions event-handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// startTimeName is the start time variable name
startTimeName = "start_time"

// windowTimeName is the start time of the last window.
windowTimeName = "window_time"

// cursorName is the cursor variable name
cursorName = "cursor"

Expand Down Expand Up @@ -120,11 +123,25 @@ func createStorageDir(c *StartCmdConfig) (string, error) {

// GetStartTime gets current start time
func (s *State) GetStartTime() (*time.Time, error) {
if !s.dv.Has(startTimeName) {
return s.getTimeKey(startTimeName)
}

// SetStartTime sets current start time
func (s *State) SetStartTime(t *time.Time) error {
return s.setTimeKey(startTimeName, t)
}

// GetLastWindowTime gets current start time
func (s *State) GetLastWindowTime() (*time.Time, error) {
return s.getTimeKey(windowTimeName)
}

func (s *State) getTimeKey(keyName string) (*time.Time, error) {
if !s.dv.Has(keyName) {
return nil, nil
}

b, err := s.dv.Read(startTimeName)
b, err := s.dv.Read(keyName)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -144,14 +161,18 @@ func (s *State) GetStartTime() (*time.Time, error) {
return &t, nil
}

// SetStartTime sets current start time
func (s *State) SetStartTime(t *time.Time) error {
func (s *State) setTimeKey(keyName string, t *time.Time) error {
if t == nil {
return s.dv.Write(startTimeName, []byte(""))
return s.dv.Write(keyName, []byte(""))
}

v := t.Truncate(time.Second).Format(time.RFC3339)
return s.dv.Write(startTimeName, []byte(v))
return s.dv.Write(keyName, []byte(v))
}

// SetLastWindowTime sets current start time of the last window used.
func (s *State) SetLastWindowTime(t *time.Time) error {
return s.setTimeKey(windowTimeName, t)
}

// GetCursor gets current cursor value
Expand Down
82 changes: 63 additions & 19 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"fmt"
"sync"
"time"

"github.com/gravitational/teleport/api/client"
Expand Down Expand Up @@ -69,15 +70,17 @@ type TeleportEventsWatcher struct {
batch []*TeleportEvent
// config is teleport config
config *StartCmdConfig
// startTime is event time frame start
startTime time.Time

// windowStartTime is event time frame start
windowStartTime time.Time
windowStartTimeMu sync.Mutex
}

// NewTeleportEventsWatcher builds Teleport client instance
func NewTeleportEventsWatcher(
ctx context.Context,
c *StartCmdConfig,
startTime time.Time,
windowStartTime time.Time,
cursor string,
id string,
) (*TeleportEventsWatcher, error) {
Expand Down Expand Up @@ -118,12 +121,12 @@ func NewTeleportEventsWatcher(
}

tc := TeleportEventsWatcher{
client: teleportClient,
pos: -1,
cursor: cursor,
config: c,
id: id,
startTime: startTime,
client: teleportClient,
pos: -1,
cursor: cursor,
config: c,
id: id,
windowStartTime: windowStartTime,
}

return &tc, nil
Expand Down Expand Up @@ -207,16 +210,45 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {

// getEvents calls Teleport client and loads events
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) {
return t.client.SearchUnstructuredEvents(
ctx,
t.startTime,
time.Now().UTC(),
"default",
t.config.Types,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
)
rangeSplitByDay := splitRangeByDay(t.getWindowStartTime(), time.Now().UTC())
for i := 1; i < len(rangeSplitByDay); i++ {
startTime := rangeSplitByDay[i-1]
endTime := rangeSplitByDay[i]
log.Debugf("Fetching events from %v to %v", startTime, endTime)
evts, cursor, err := t.client.SearchUnstructuredEvents(
ctx,
startTime,
endTime,
"default",
t.config.Types,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
)
if err != nil {
return nil, "", trace.Wrap(err)
}

// if no events are found, the cursor is out of the range [startTime, endTime]
// and it's the last complete day, update start time to the next day.
if len(evts) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", startTime, endTime)
t.setWindowStartTime(endTime)
continue
}
// if any events are found, return them
return evts, cursor, nil
}
return nil, t.cursor, nil
}

func splitRangeByDay(from, to time.Time) []time.Time {
// splitRangeByDay splits the range into days
var days []time.Time
for d := from; d.Before(to); d = d.AddDate(0, 0, 1) {
days = append(days, d)
}
return append(days, to) // add the last date
}

// pause sleeps for timeout seconds
Expand Down Expand Up @@ -345,3 +377,15 @@ func (t *TeleportEventsWatcher) UpsertLock(ctx context.Context, user string, log

return t.client.UpsertLock(ctx, lock)
}

func (t *TeleportEventsWatcher) getWindowStartTime() time.Time {
t.windowStartTimeMu.Lock()
defer t.windowStartTimeMu.Unlock()
return t.windowStartTime
}

func (t *TeleportEventsWatcher) setWindowStartTime(time time.Time) {
t.windowStartTimeMu.Lock()
defer t.windowStartTimeMu.Unlock()
t.windowStartTime = time
}
Loading

0 comments on commit 37db668

Please sign in to comment.