Skip to content

Commit

Permalink
Allow stream custom maxsize per batch (#2063)
Browse files Browse the repository at this point in the history
  • Loading branch information
simon28082 authored Oct 25, 2024
1 parent ca7c9d4 commit 10a09e6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
with:
go-version: ${{ env.GOVERSION }}
- name: golang-lint
uses: golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v6.1.1
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: latest
Expand Down
11 changes: 10 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ type Stream struct {
// Note: Calls to ChooseKey are concurrent.
ChooseKey func(item *Item) bool

// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
// as a single list that is still over the limit will have to be sent as is since it
// cannot be split further. This limit prevents the framework from creating batches
// so big that sending them causes issues (e.g running into the max size gRPC limit).
// If necessary, set it up before the Stream starts synchronisation
// This is not a concurrency-safe setting
MaxSize uint64

// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
// is expected that the user would advance the iterator to go through the versions of the
Expand Down Expand Up @@ -315,7 +323,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
// Send the batch immediately if it already exceeds the maximum allowed size.
// If the size of the batch exceeds maxStreamSize, break from the loop to
// avoid creating a batch that is so big that certain limits are reached.
if batch.LenNoPadding() > int(maxStreamSize) {
if uint64(batch.LenNoPadding()) > st.MaxSize {
break loop
}
select {
Expand Down Expand Up @@ -452,6 +460,7 @@ func (db *DB) newStream() *Stream {
db: db,
NumGo: db.opt.NumGoroutines,
LogPrefix: "Badger.Stream",
MaxSize: maxStreamSize,
}
}

Expand Down
61 changes: 61 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,67 @@ func TestStream(t *testing.T) {
require.NoError(t, db.Close())
}

func TestStreamMaxSize(t *testing.T) {
if !*manual {
t.Skip("Skipping test meant to be run manually.")
return
}
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := os.MkdirTemp("", "badger-big-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
wb := db.NewWriteBatchAt(5)
for _, prefix := range []string{"p0", "p1", "p2"} {
for i := 1; i <= testSize; i++ {
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
}
}
require.NoError(t, wb.Flush())

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = c.Send

// default value
require.Equal(t, stream.MaxSize, maxStreamSize)

// reset maxsize
stream.MaxSize = 1024 * 1024 * 50

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
require.NoError(t, db.Close())
}

func TestStreamWithThreadId(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
Expand Down

0 comments on commit 10a09e6

Please sign in to comment.