Skip to content

Commit

Permalink
core/rawdb: fix flaw with deferred reads not being performed
Browse files Browse the repository at this point in the history
  • Loading branch information
holiman committed Aug 7, 2021
1 parent d6b40b8 commit 821c696
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
9 changes: 8 additions & 1 deletion core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
size := int(offset2 - offset1)
// Crossing a file boundary?
if secondIndex.filenum != firstIndex.filenum {
// If we have unread data in the first file, we need to do that read now.
if unreadSize > 0 {
// If we have unread data in the first file, we need to do that read now.
if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
Expand All @@ -722,6 +722,13 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
readStart = 0
}
if i > 0 && uint64(totalSize+size) > maxBytes {
// About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now.
if unreadSize > 0 {
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
}
break
}
// Defer the read for later
Expand Down
55 changes: 55 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,58 @@ func TestSequentialRead(t *testing.T) {
f.Close()
}
}

// TestSequentialReadByteLimit does some more advanced tests on batch reads.
// These tests check that when the byte limit hits, we correctly abort in time,
// but also properly do all the deferred reads for the previous data, regardless
// of whether the data crosses a file boundary or not.
func TestSequentialReadByteLimit(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-2-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}
// Write 10 bytes 30 times,
// Splitting it at every 100 bytes (10 items)
for x := 0; x < 30; x++ {
data := getChunk(10, x)
f.Append(uint64(x), data)
}
f.Close()
}
for i, tc := range []struct {
items uint64
limit uint64
want int
}{
{9, 89, 8},
{10, 99, 9},
{11, 109, 10},
{100, 89, 8},
{100, 99, 9},
{100, 109, 10},
} {
{
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 100, true)
if err != nil {
t.Fatal(err)
}
items, err := f.RetrieveItems(0, tc.items, tc.limit)
if err != nil {
t.Fatal(err)
}
if have, want := len(items), tc.want; have != want {
t.Fatalf("test %d: want %d items, have %d ", i, want, have)
}
for ii, have := range items {
want := getChunk(10, ii)
if !bytes.Equal(want, have) {
t.Fatalf("test %d: data corruption item %d: have\n%x\n, want \n%x\n", i, ii, have, want)
}
}
f.Close()
}
}
}

0 comments on commit 821c696

Please sign in to comment.