Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb: implement sequential reads in freezer_table #23117

Merged
merged 6 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}

// ReadAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
return nil, errNotSupported
}

// Ancients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
Expand Down
12 changes: 12 additions & 0 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errUnknownTable
}

// Ancient retrieves multiple items in sequence, starting from the index 'start'.
holiman marked this conversation as resolved.
Show resolved Hide resolved
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
func (f *freezer) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
holiman marked this conversation as resolved.
Show resolved Hide resolved
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, max, maxByteSize)
}
return nil, errUnknownTable
}

// Ancients returns the length of the frozen items.
func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
Expand Down
206 changes: 157 additions & 49 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte {
return b
}

// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the given indexEntry, which are assumed to be
// sequential.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the given indexEntry, which are assumed to be sequential." made me do 5 takes until I understood what it means. Perhaps we could reformulate it like

// bounds returns the start- and end- offsets, and the file number of where to
// read there data item defined by the two index entries. The two entries are
// assumed to be sequential.

func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
if start.filenum != end.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, end.offset, end.filenum
}
return start.offset, end.offset, end.filenum
}

// freezerTable represents a single chained data table within the freezer (e.g. blocks).
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
// file (uncompressed 64 bit indices into the data file).
Expand Down Expand Up @@ -546,84 +559,179 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool
return false, nil
}

// getBounds returns the indexes for the item
// returns start, end, filenumber and error
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
buffer := make([]byte, indexEntrySize)
var startIdx, endIdx indexEntry
// Read second index
if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
return 0, 0, 0, err
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
holiman marked this conversation as resolved.
Show resolved Hide resolved
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
// Apply the table-offset
from = from - uint64(t.itemOffset)
// For reading N items, we need N+1 indices.
buffer := make([]byte, (count+1)*indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
return nil, err
}
endIdx.unmarshalBinary(buffer)
// Read first index (unless it's the very first item)
if item != 0 {
if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
return 0, 0, 0, err
}
startIdx.unmarshalBinary(buffer)
} else {
var (
indices []*indexEntry
offset int
)
for i := from; i <= from+count; i++ {
var startIndex = new(indexEntry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems weird to call this startIndex, since we use it as index for every item. Also consider index := new(indexEntry). No need for the var form.

startIndex.unmarshalBinary(buffer[offset:])
offset += indexEntrySize
indices = append(indices, startIndex)
}
if from == 0 {
// Special case if we're reading the first item in the freezer. We assume that
// the first item always start from zero(regarding the deletion, we
// only support deletion by files, so that the assumption is held).
// This means we can use the first item metadata to carry information about
// the 'global' offset, for the deletion-case
return 0, endIdx.offset, endIdx.filenum, nil
indices[0].offset = 0
indices[0].filenum = indices[1].filenum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really following. Isn't the offset already zero if it's the first item? I also don't understand why the file number is assigned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, isn't this the case for all items that start a new file, not just from == 0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, is the point that "indices[0].filenum` might not exist at all (since it's a deleted file), vs. anything else exists and we can "read and see the data overflown"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more or less what the comment says above. So each index is a pair of filenum, offset. Like this:

| number | fileno | offset |
|--------|--------|--------|
|  000   |  000   |  000   | 
|  001   |  000   |  020   | 
|  002   |  000   |  040   | 
|  003   |  001   |  020   | 
|  004   |  001   |  040   | 
|  005   |  002   |  020   |  <-- we want to delete anything before here
|  006   |  002   |  040   | 

However, the 0th index will always be | 000 | 000 | 000 |, right?

Therefore, back in the day, we decided to use this quirk to implement deletability. So if we want to delete the file 0 and 1, we need to store the item-offset somewhere. So we store that in the offset (which otherwise is the byte-offset within the file -- but we know the first item in a file always has byte offset 0).

| number | fileno | offset |
|--------|--------|--------|
|  000   |  002   |  004   | 
|  001   |  002   |  020   | 
|  002   |  002   |  040   | 

I think you're right though, that we wouldn't really need to set the filenum. However, as you can see in the previous code, for item N, marked by indexA and indexB, the actual data is always stored in the file given by indexB. The two may differ, if the data crosses a file boundary.

In this case, we want to return to the caller info about where to start reading, and where to stop. Therefore, doing this conversion here simplifies for the calling code, which can just treat the indices as direct data information without dealing with the intricacies of the data indexing scheme.

}
if startIdx.filenum != endIdx.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, endIdx.offset, endIdx.filenum, nil
return indices, nil
}

// getBounds returns the indexes for the item
// returns start, end, filenumber and error
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
indices, err := t.getIndices(item, 1)
if err != nil {
return 0, 0, 0, err
}
return startIdx.offset, endIdx.offset, endIdx.filenum, nil
start, end, fileNum := indices[0].bounds(indices[1])
return start, end, fileNum, nil
}

// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
blob, err := t.retrieve(item)
if items, err := t.RetrieveItems(item, 1, 0); err != nil {
return nil, err
} else {
return items[0], nil
}
holiman marked this conversation as resolved.
Show resolved Hide resolved
}

// RetrieveItems returns multiple items in sequence, starting from the index 'start'.
// It will return at most 'max' items, but will abort earlier to respect the
// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
// item, it _will_ return one element and possibly overflow the maxBytes.
func (t *freezerTable) RetrieveItems(start, max, maxBytes uint64) ([][]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, but instead of max, I'd call this count

// First we read the 'raw' data, which might be compressed.
diskData, sizes, err := t.retrieveItems(start, max, maxBytes)
if err != nil {
return nil, err
}
if t.noCompression {
return blob, nil
var (
output = make([][]byte, 0, max)
offset int // offset for reading
outputSize int // size of uncompressed data
)
// Now slice up the data and decompress.
for i, diskSize := range sizes {
item := diskData[offset : offset+diskSize]
offset += diskSize
decompressedSize := diskSize
if !t.noCompression {
decompressedSize, _ = snappy.DecodedLen(item)
}
if i > 0 && uint64(outputSize+decompressedSize) > maxBytes {
break
}
if !t.noCompression {
if data, err := snappy.Decode(nil, item); err != nil {
return nil, err
} else {
output = append(output, data)
}
holiman marked this conversation as resolved.
Show resolved Hide resolved
} else {
output = append(output, item)
}
outputSize += decompressedSize
}
return snappy.Decode(nil, blob)
return output, nil
}

// retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file. OBS! This method does not decode
// compressed data.
func (t *freezerTable) retrieve(item uint64) ([]byte, error) {
func (t *freezerTable) retrieveItems(start, max, maxBytes uint64) ([]byte, []int, error) {
holiman marked this conversation as resolved.
Show resolved Hide resolved
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
return nil, errClosed
return nil, nil, errClosed
}
if atomic.LoadUint64(&t.items) <= item {
return nil, errOutOfBounds
itemCount := atomic.LoadUint64(&t.items) // max number
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
if itemCount <= start || uint64(t.itemOffset) > start || max == 0 {
return nil, nil, errOutOfBounds
}
// Ensure the item was not deleted from the tail either
if uint64(t.itemOffset) > item {
return nil, errOutOfBounds
if start+max > itemCount {
max = itemCount - start
}
startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset))
if err != nil {
return nil, err
var (
output = make([]byte, maxBytes) // Buffer to read data into
outputSize int // Used size of that buffer
)
// Read the data
holiman marked this conversation as resolved.
Show resolved Hide resolved
readData := func(fileId, start uint32, length int) error {
// In case a small limit is used, and the elements are large, may need to
// realloc the read-buffer when reading the first (and only) item.
if len(output) < length {
output = make([]byte, length)
}
if dataFile, exist := t.files[fileId]; !exist {
return fmt.Errorf("missing data file %d", fileId)
} else if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil {
return err
}
outputSize += length
holiman marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
dataFile, exist := t.files[filenum]
if !exist {
return nil, fmt.Errorf("missing data file %d", filenum)
// Read all the indexes in one go
indices, err := t.getIndices(start, max)
if err != nil {
return nil, nil, err
}
// Retrieve the data itself, decompress and return
blob := make([]byte, endOffset-startOffset)
if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
return nil, err
var (
sizes []int // The sizes for each element
totalSize = 0 // The total size of all data read so far
readStart = indices[0].offset // Where, in the file, to start reading
unreadSize = 0 // The size of the as-yet-unread data
)

for i, firstIndex := range indices[:len(indices)-1] {
secondIndex := indices[i+1]
// Determine the size of the item.
offset1, offset2, _ := firstIndex.bounds(secondIndex)
size := int(offset2 - offset1)
// Crossing a file boundary?
if secondIndex.filenum != firstIndex.filenum {
if unreadSize > 0 {
// If we have unread data in the first file, we need to do that read now.
if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
unreadSize = 0
}
readStart = 0
}
if i > 0 && uint64(totalSize+size) > maxBytes {
break
holiman marked this conversation as resolved.
Show resolved Hide resolved
}
// Defer the read for later
unreadSize += size
totalSize += size
sizes = append(sizes, size)
if i == len(indices)-2 || uint64(totalSize) > maxBytes {
// Last item, need to do the read now
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
break
}
}
t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize))
return blob, nil
return output[:outputSize], sizes, nil
}

// has returns an indicator whether the specified number data
Expand Down
62 changes: 61 additions & 1 deletion core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestFreezerBasics(t *testing.T) {
exp := getChunk(15, y)
got, err := f.Retrieve(uint64(y))
if err != nil {
t.Fatal(err)
t.Fatalf("reading item %d: %v", y, err)
}
if !bytes.Equal(got, exp) {
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
Expand Down Expand Up @@ -692,3 +692,63 @@ func TestAppendTruncateParallel(t *testing.T) {
}
}
}

// TestSequentialRead does some basic tests on the RetrieveItems.
func TestSequentialRead(t *testing.T) {
rm, wm, sg := metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge()
fname := fmt.Sprintf("batchread-%d", rand.Uint64())
{ // Fill table
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
// Write 15 bytes 30 times
for x := 0; x < 30; x++ {
data := getChunk(15, x)
f.Append(uint64(x), data)
}
f.DumpIndex(0, 30)
f.Close()
}
{ // Open it, iterate, verify iteration
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 50, true)
if err != nil {
t.Fatal(err)
}
items, err := f.RetrieveItems(0, 10000, 100000)
if err != nil {
t.Fatal(err)
}
if have, want := len(items), 30; have != want {
t.Fatalf("want %d items, have %d ", want, have)
}
for i, have := range items {
want := getChunk(15, i)
if !bytes.Equal(want, have) {
t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want)
}
}
f.Close()
}
{ // Open it, iterate, verify byte limit. The byte limit is less than item
// size, so each lookup should only return one otem
holiman marked this conversation as resolved.
Show resolved Hide resolved
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
if err != nil {
t.Fatal(err)
}
items, err := f.RetrieveItems(0, 10000, 10)
if err != nil {
t.Fatal(err)
}
if have, want := len(items), 1; have != want {
t.Fatalf("want %d items, have %d ", want, have)
}
for i, have := range items {
want := getChunk(15, i)
if !bytes.Equal(want, have) {
t.Fatalf("data corruption: have\n%x\n, want \n%x\n", have, want)
}
}
f.Close()
}
holiman marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 6 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) {
return t.db.Ancient(kind, number)
}

// ReadAncients is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
holiman marked this conversation as resolved.
Show resolved Hide resolved
return t.db.ReadAncients(kind, start, max, maxByteSize)
}

// Ancients is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) Ancients() (uint64, error) {
Expand Down
7 changes: 7 additions & 0 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type AncientReader interface {
// Ancient retrieves an ancient binary blob from the append-only immutable files.
Ancient(kind string, number uint64) ([]byte, error)

// Ancient retrieves multiple items in sequence, starting from the index 'start'.
holiman marked this conversation as resolved.
Show resolved Hide resolved
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error)
holiman marked this conversation as resolved.
Show resolved Hide resolved

// Ancients returns the ancient item numbers in the ancient store.
Ancients() (uint64, error)

Expand Down