Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
Event-handler: call SearchUnstructuredEvents with smaller windows
Browse files Browse the repository at this point in the history
This PR aims to reduce the windows sent from `SearchUnstructuredEvents`.
StartTime is never update so it's kept with its initial value which
causes problems to the event handler sending windows that include more
than 1Gb of data when using the Athena backend which causes failures.

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato committed May 13, 2024
1 parent 819f62a commit 68aeb02
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 25 deletions.
49 changes: 39 additions & 10 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,45 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {

// getEvents calls Teleport client and loads events
func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) {
return t.client.SearchUnstructuredEvents(
ctx,
t.startTime,
time.Now().UTC(),
"default",
t.config.Types,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
)
rangeSplitByDay := splitRangeByDay(t.startTime, time.Now().UTC())
for i := 1; i < len(rangeSplitByDay); i++ {
startTime := rangeSplitByDay[i-1]
endTime := rangeSplitByDay[i]
log.Debugf("Fetching events from %v to %v", startTime, endTime)
evts, cursor, err := t.client.SearchUnstructuredEvents(
ctx,
startTime,
endTime,
"default",
t.config.Types,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
)
if err != nil {
return nil, "", trace.Wrap(err)
}

// if no events are found, the cursor is out of the range [startTime, endTime]
// and it's the last complete day, update start time to the next day.
if len(evts) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", startTime, endTime)
t.startTime = endTime
continue
}
// if any events are found, return them
return evts, cursor, nil
}
return nil, t.cursor, nil
}

func splitRangeByDay(from, to time.Time) []time.Time {
// splitRangeByDay splits the range into days
var days []time.Time
for d := from; d.Before(to); d = d.AddDate(0, 0, 1) {
days = append(days, d)
}
return append(days, to) // add the last date
}

// pause sleeps for timeout seconds
Expand Down
90 changes: 75 additions & 15 deletions event-handler/teleport_events_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"strconv"
"sync"
"testing"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/trace"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

// mockTeleportEventWatcher is Teleport client mock
Expand Down Expand Up @@ -121,6 +121,7 @@ func (c *mockTeleportEventWatcher) Close() error {
}

func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher {

client := &TeleportEventsWatcher{
client: eventsClient,
pos: -1,
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestEvents(t *testing.T) {
case err := <-chErr:
t.Fatalf("Received unexpected error from error channel: %v", err)
return
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand All @@ -178,14 +179,14 @@ func TestEvents(t *testing.T) {
select {
case _, ok := <-chEvt:
require.False(t, ok, "Events channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

select {
case _, ok := <-chErr:
require.False(t, ok, "Error channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

Expand All @@ -196,22 +197,22 @@ func TestEvents(t *testing.T) {
select {
case err := <-chErr:
require.Error(t, mockErr, err)
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

// Both channels should be closed
select {
case _, ok := <-chEvt:
require.False(t, ok, "Events channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

select {
case _, ok := <-chErr:
require.False(t, ok, "Error channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func TestUpdatePage(t *testing.T) {
case err := <-chErr:
t.Fatalf("Received unexpected error from error channel: %v", err)
return
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand All @@ -263,7 +264,7 @@ func TestUpdatePage(t *testing.T) {
t.Fatalf("Events channel should be open")
case <-chErr:
t.Fatalf("Events channel should be open")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
}

// Update the event watcher with the full page of events an collect.
Expand All @@ -279,7 +280,7 @@ func TestUpdatePage(t *testing.T) {
case err := <-chErr:
t.Fatalf("Received unexpected error from error channel: %v", err)
return
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand All @@ -290,7 +291,7 @@ func TestUpdatePage(t *testing.T) {
t.Fatalf("Events channel should be open")
case <-chErr:
t.Fatalf("Events channel should be open")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
}

// Add another partial page and collect the events
Expand All @@ -306,7 +307,7 @@ func TestUpdatePage(t *testing.T) {
case err := <-chErr:
t.Fatalf("Received unexpected error from error channel: %v", err)
return
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand All @@ -318,22 +319,22 @@ func TestUpdatePage(t *testing.T) {
select {
case err := <-chErr:
require.Error(t, mockErr, err)
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

// Both channels should be closed
select {
case _, ok := <-chEvt:
require.False(t, ok, "Events channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}

select {
case _, ok := <-chErr:
require.False(t, ok, "Error channel should be closed")
case <-time.After(100 * time.Millisecond):
case <-time.After(2 * time.Second):
t.Fatalf("No events received within deadline")
}
}
Expand Down Expand Up @@ -414,3 +415,62 @@ func TestValidateConfig(t *testing.T) {
})
}
}

func Test_splitRangeByDay(t *testing.T) {
type args struct {
from time.Time
to time.Time
}
tests := []struct {
name string
args args
want []time.Time
}{
{
name: "Same day",
args: args{
from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
to: time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC),
},
want: []time.Time{
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 1, 23, 59, 59, 0, time.UTC),
},
},
{
name: "Two days",
args: args{
from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
to: time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC),
},
want: []time.Time{
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 2, 23, 59, 59, 0, time.UTC),
},
},
{
name: "week",
args: args{
from: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
to: time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC),
},
want: []time.Time{
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 5, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 6, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 7, 0, 0, 0, 0, time.UTC),
time.Date(2021, 1, 7, 23, 59, 59, 0, time.UTC),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := splitRangeByDay(tt.args.from, tt.args.to)
require.Equal(t, tt.want, got)
})
}
}

0 comments on commit 68aeb02

Please sign in to comment.