Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DynamoDB getAllRecords logic when 1MB query limit is reached #10726

Merged
merged 11 commits into from
Mar 4, 2022
8 changes: 6 additions & 2 deletions lib/backend/dynamo/dynamodbbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,19 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,

func (b *Backend) getAllRecords(ctx context.Context, startKey []byte, endKey []byte, limit int) (*getResult, error) {
var result getResult
limitRemaining := limit
// this code is being extra careful here not to introduce endless loop
// by some unfortunate series of events
for i := 0; i < backend.DefaultRangeLimit/100; i++ {
re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limit, result.lastEvaluatedKey)
if limit > 0 {
limitRemaining = limit - len(result.records)
}
re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limitRemaining, result.lastEvaluatedKey)
smallinsky marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, trace.Wrap(err)
}
result.records = append(result.records, re.records...)
if len(result.records) >= limit || len(re.lastEvaluatedKey) == 0 {
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
smallinsky marked this conversation as resolved.
Show resolved Hide resolved
if len(result.records) == backend.DefaultRangeLimit {
b.Warnf("Range query hit backend limit. (this is a bug!) startKey=%q,limit=%d", startKey, backend.DefaultRangeLimit)
}
Expand Down
7 changes: 4 additions & 3 deletions lib/backend/dynamo/dynamodbbk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestDynamoDB(t *testing.T) {
if err != nil {
return nil, nil, trace.Wrap(err)
}
clock := clockwork.NewFakeClock()
clock := clockwork.NewFakeClockAt(time.Now())
uut.clock = clock
return uut, clock, nil
}
Expand Down
41 changes: 35 additions & 6 deletions lib/backend/test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -156,6 +157,10 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) {
t.Run("Mirror", func(t *testing.T) {
testMirror(t, newBackend)
})

t.Run("FetchLimit", func(t *testing.T) {
testFetchLimit(t, newBackend)
})
}

// RequireItems asserts that the supplied `actual` items collection matches
Expand Down Expand Up @@ -548,7 +553,7 @@ func testEvents(t *testing.T, newBackend Constructor) {
item = &backend.Item{
Key: prefix("c"),
Value: []byte("val"),
Expires: clock.Now().Add(1 * time.Second),
Expires: clock.Now().Add(3 * time.Second),
}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
Expand All @@ -561,17 +566,41 @@ func testEvents(t *testing.T, newBackend Constructor) {
e = requireEvent(t, watcher, types.OpPut, item.Key, eventTimeout)
require.Equal(t, item.Value, e.Item.Value)

// Wait a few seconds for the item to expire.
clock.Advance(3 * time.Second)

// Make sure item has been removed.
_, err = uut.Get(ctx, item.Key)
require.Error(t, err)
require.Eventually(t, func() bool {
_, err = uut.Get(ctx, item.Key)
return trace.IsNotFound(err)
}, time.Second*4, time.Millisecond*200, "Failed to ensure that item %q has been deleted", item.Key)
smallinsky marked this conversation as resolved.
Show resolved Hide resolved

// Make sure a DELETE event is emitted.
requireEvent(t, watcher, types.OpDelete, item.Key, 2*time.Second)
}

// testFetchLimit tests fetch max items size limit.
func testFetchLimit(t *testing.T, newBackend Constructor) {
uut, _, err := newBackend()
require.NoError(t, err)
defer func() { require.NoError(t, uut.Close()) }()

prefix := MakePrefix()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Allocate 4KB buffer.
smallinsky marked this conversation as resolved.
Show resolved Hide resolved
buff := make([]byte, 1<<16)
itemsCount := 20
// Fill the backend with events that total size is greater than 1MB (4KB * 20 > 1MB).
for i := 0; i < itemsCount; i++ {
item := &backend.Item{Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: buff}
_, err = uut.Put(ctx, *item)
require.NoError(t, err)
}

result, err := uut.GetRange(ctx, prefix("/db"), backend.RangeEnd(prefix("/db")), backend.NoLimit)
require.NoError(t, err)
require.Equal(t, itemsCount, len(result.Items))
}

// requireEvent asserts that a given event type with the given key is emitted
// by a watcher within the supplied timeout, returning that event for further
// inspection if successful.
Expand Down