Skip to content

Commit

Permalink
[v7] Fix DynamoDB getAllRecords logic when 1MB query limit is reached (
Browse files Browse the repository at this point in the history
  • Loading branch information
smallinsky authored Mar 9, 2022
1 parent 18ad414 commit ec690f2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
13 changes: 9 additions & 4 deletions lib/backend/dynamo/dynamodbbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,
if len(endKey) == 0 {
return nil, trace.BadParameter("missing parameter endKey")
}
if limit <= 0 {
limit = backend.DefaultRangeLimit
}
result, err := b.getAllRecords(ctx, startKey, endKey, limit)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -391,7 +394,9 @@ func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []b
return nil, trace.Wrap(err)
}
result.records = append(result.records, re.records...)
if len(result.records) >= limit || len(re.lastEvaluatedKey) == 0 {
// If the limit was exceeded or there are no more records to fetch return the current result
// otherwise updated lastEvaluatedKey and proceed with obtaining new records.
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
if len(result.records) == backend.DefaultRangeLimit {
b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit)
}
Expand Down Expand Up @@ -744,12 +749,12 @@ func (b *Backend) getRecords(ctx context.Context, startKey, endKey string, limit

// isExpired returns 'true' if the given object (record) has a TTL and
// it's due.
func (r *record) isExpired() bool {
func (r *record) isExpired(now time.Time) bool {
if r.Expires == nil {
return false
}
expiryDateUTC := time.Unix(*r.Expires, 0).UTC()
return time.Now().UTC().After(expiryDateUTC)
return now.UTC().After(expiryDateUTC)
}

func removeDuplicates(elements []record) []record {
Expand Down Expand Up @@ -863,7 +868,7 @@ func (b *Backend) getKey(ctx context.Context, key []byte) (*record, error) {
return nil, trace.WrapWithMessage(err, "%q is not found", string(key))
}
// Check if key expired, if expired delete it
if r.isExpired() {
if r.isExpired(b.clock.Now()) {
if err := b.deleteKey(ctx, key); err != nil {
b.Warnf("Failed deleting expired key %q: %v", key, err)
}
Expand Down
17 changes: 16 additions & 1 deletion lib/backend/dynamo/dynamodbbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
Expand All @@ -48,17 +50,22 @@ var _ = check.Suite(&DynamoDBSuite{})

func (s *DynamoDBSuite) SetUpSuite(c *check.C) {
s.tableName = "teleport.dynamo.test"
clock := clockwork.NewFakeClock()
newBackend := func() (backend.Backend, error) {
return New(context.Background(), map[string]interface{}{
bk, err := New(context.Background(), map[string]interface{}{
"table_name": s.tableName,
"poll_stream_period": 300 * time.Millisecond,
})
bk.clock = clock
return bk, err
}

bk, err := newBackend()
c.Assert(err, check.IsNil)
s.bk = bk.(*Backend)
s.suite.B = s.bk
s.suite.NewBackend = newBackend
s.suite.Clock = clock
}

func (s *DynamoDBSuite) TearDownSuite(c *check.C) {
Expand Down Expand Up @@ -103,3 +110,11 @@ func (s *DynamoDBSuite) TestWatchersClose(c *check.C) {
func (s *DynamoDBSuite) TestLocking(c *check.C) {
s.suite.Locking(c, s.bk)
}

func (s *DynamoDBSuite) TestFetchLimit(c *check.C) {
s.suite.FetchLimit(c)
}

func (s *DynamoDBSuite) TestLimit(c *check.C) {
s.suite.Limit(c)
}
58 changes: 58 additions & 0 deletions lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package test
import (
"context"
"encoding/hex"
"fmt"
"math/rand"
"sync/atomic"
"time"
Expand Down Expand Up @@ -831,3 +832,60 @@ func verifyExpireTimestampsIncreasing(c *check.C, obtained, expected []backend.E
}
}
}

func (s *BackendSuite) FetchLimit(c *check.C) {
prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Allocate 65KB buffer.
buff := make([]byte, 1<<16)
itemsCount := 20
// Fill the backend with events that total size is greater than 1MB (65KB * 20 > 1MB).
for i := 0; i < itemsCount; i++ {
item := &backend.Item{Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: buff}
_, err := s.B.Put(ctx, *item)
c.Assert(err, check.IsNil)
}

result, err := s.B.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), backend.NoLimit)
c.Assert(err, check.IsNil)
c.Assert(len(result.Items), check.Equals, 20)
}

// Limit tests limit.
func (s *BackendSuite) Limit(c *check.C) {
prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

item := &backend.Item{
Key: prefix("/db/database_tail_item"),
Value: []byte("data"),
Expires: s.Clock.Now().Add(time.Minute),
}
_, err := s.B.Put(ctx, *item)
c.Assert(err, check.IsNil)
for i := 0; i < 10; i++ {
item := &backend.Item{
Key: prefix(fmt.Sprintf("/db/database%d", i)),
Value: []byte("data"),
Expires: s.Clock.Now().Add(time.Second * 10),
}
_, err = s.B.Put(ctx, *item)
c.Assert(err, check.IsNil)
}
s.Clock.Advance(time.Second * 20)

item = &backend.Item{
Key: prefix("/db/database_head_item"),
Value: []byte("data"),
Expires: s.Clock.Now().Add(time.Minute),
}
_, err = s.B.Put(ctx, *item)
c.Assert(err, check.IsNil)

result, err := s.B.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), 2)
c.Assert(err, check.IsNil)
c.Assert(len(result.Items), check.Equals, 2)
}

0 comments on commit ec690f2

Please sign in to comment.