Skip to content

Commit

Permalink
core/rawdb: implement sequential reads in freezer_table (ethereum#23117)
Browse files Browse the repository at this point in the history
* core/rawdb: implement sequential reads in freezer_table

* core/rawdb, ethdb: add sequential reader to db interface

* core/rawdb: lint nitpicks

* core/rawdb: fix some nitpicks

* core/rawdb: fix flaw with deferred reads not being performed

* core/rawdb: better documentation
  • Loading branch information
holiman authored and atif-konasl committed Oct 15, 2021
1 parent 9083c0a commit 4c04dd6
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 52 deletions.
5 changes: 5 additions & 0 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}

// ReadAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
return nil, errNotSupported
}

// Ancients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
Expand Down
12 changes: 12 additions & 0 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errUnknownTable
}

// ReadAncients retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
}
return nil, errUnknownTable
}

// Ancients returns the length of the frozen items.
func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
Expand Down
214 changes: 163 additions & 51 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ func (i *indexEntry) marshallBinary() []byte {
return b
}

// bounds returns the start- and end- offsets, and the file number of where to
// read there data item marked by the two index entries. The two entries are
// assumed to be sequential.
func (start *indexEntry) bounds(end *indexEntry) (startOffset, endOffset, fileId uint32) {
if start.filenum != end.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, end.offset, end.filenum
}
return start.offset, end.offset, end.filenum
}

// freezerTable represents a single chained data table within the freezer (e.g. blocks).
// It consists of a data file (snappy encoded arbitrary data blobs) and an indexEntry
// file (uncompressed 64 bit indices into the data file).
Expand Down Expand Up @@ -546,84 +559,183 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool
return false, nil
}

// getBounds returns the indexes for the item
// returns start, end, filenumber and error
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
buffer := make([]byte, indexEntrySize)
var startIdx, endIdx indexEntry
// Read second index
if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
return 0, 0, 0, err
}
endIdx.unmarshalBinary(buffer)
// Read first index (unless it's the very first item)
if item != 0 {
if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
return 0, 0, 0, err
}
startIdx.unmarshalBinary(buffer)
} else {
// getIndices returns the index entries for the given from-item, covering 'count' items.
// N.B: The actual number of returned indices for N items will always be N+1 (unless an
// error is returned).
// OBS: This method assumes that the caller has already verified (and/or trimmed) the range
// so that the items are within bounds. If this method is used to read out of bounds,
// it will return error.
func (t *freezerTable) getIndices(from, count uint64) ([]*indexEntry, error) {
// Apply the table-offset
from = from - uint64(t.itemOffset)
// For reading N items, we need N+1 indices.
buffer := make([]byte, (count+1)*indexEntrySize)
if _, err := t.index.ReadAt(buffer, int64(from*indexEntrySize)); err != nil {
return nil, err
}
var (
indices []*indexEntry
offset int
)
for i := from; i <= from+count; i++ {
index := new(indexEntry)
index.unmarshalBinary(buffer[offset:])
offset += indexEntrySize
indices = append(indices, index)
}
if from == 0 {
// Special case if we're reading the first item in the freezer. We assume that
// the first item always start from zero(regarding the deletion, we
// only support deletion by files, so that the assumption is held).
// This means we can use the first item metadata to carry information about
// the 'global' offset, for the deletion-case
return 0, endIdx.offset, endIdx.filenum, nil
indices[0].offset = 0
indices[0].filenum = indices[1].filenum
}
if startIdx.filenum != endIdx.filenum {
// If a piece of data 'crosses' a data-file,
// it's actually in one piece on the second data-file.
// We return a zero-indexEntry for the second file as start
return 0, endIdx.offset, endIdx.filenum, nil
}
return startIdx.offset, endIdx.offset, endIdx.filenum, nil
return indices, nil
}

// Retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file.
func (t *freezerTable) Retrieve(item uint64) ([]byte, error) {
blob, err := t.retrieve(item)
items, err := t.RetrieveItems(item, 1, 0)
if err != nil {
return nil, err
}
if t.noCompression {
return blob, nil
return items[0], nil
}

// RetrieveItems returns multiple items in sequence, starting from the index 'start'.
// It will return at most 'max' items, but will abort earlier to respect the
// 'maxBytes' argument. However, if the 'maxBytes' is smaller than the size of one
// item, it _will_ return one element and possibly overflow the maxBytes.
func (t *freezerTable) RetrieveItems(start, count, maxBytes uint64) ([][]byte, error) {
// First we read the 'raw' data, which might be compressed.
diskData, sizes, err := t.retrieveItems(start, count, maxBytes)
if err != nil {
return nil, err
}
return snappy.Decode(nil, blob)
var (
output = make([][]byte, 0, count)
offset int // offset for reading
outputSize int // size of uncompressed data
)
// Now slice up the data and decompress.
for i, diskSize := range sizes {
item := diskData[offset : offset+diskSize]
offset += diskSize
decompressedSize := diskSize
if !t.noCompression {
decompressedSize, _ = snappy.DecodedLen(item)
}
if i > 0 && uint64(outputSize+decompressedSize) > maxBytes {
break
}
if !t.noCompression {
data, err := snappy.Decode(nil, item)
if err != nil {
return nil, err
}
output = append(output, data)
} else {
output = append(output, item)
}
outputSize += decompressedSize
}
return output, nil
}

// retrieve looks up the data offset of an item with the given number and retrieves
// the raw binary blob from the data file. OBS! This method does not decode
// compressed data.
func (t *freezerTable) retrieve(item uint64) ([]byte, error) {
// retrieveItems reads up to 'count' items from the table. It reads at least
// one item, but otherwise avoids reading more than maxBytes bytes.
// It returns the (potentially compressed) data, and the sizes.
func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []int, error) {
t.lock.RLock()
defer t.lock.RUnlock()
// Ensure the table and the item is accessible
if t.index == nil || t.head == nil {
return nil, errClosed
return nil, nil, errClosed
}
if atomic.LoadUint64(&t.items) <= item {
return nil, errOutOfBounds
itemCount := atomic.LoadUint64(&t.items) // max number
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something
if itemCount <= start || uint64(t.itemOffset) > start || count == 0 {
return nil, nil, errOutOfBounds
}
// Ensure the item was not deleted from the tail either
if uint64(t.itemOffset) > item {
return nil, errOutOfBounds
if start+count > itemCount {
count = itemCount - start
}
startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset))
if err != nil {
return nil, err
var (
output = make([]byte, maxBytes) // Buffer to read data into
outputSize int // Used size of that buffer
)
// readData is a helper method to read a single data item from disk.
readData := func(fileId, start uint32, length int) error {
// In case a small limit is used, and the elements are large, may need to
// realloc the read-buffer when reading the first (and only) item.
if len(output) < length {
output = make([]byte, length)
}
dataFile, exist := t.files[fileId]
if !exist {
return fmt.Errorf("missing data file %d", fileId)
}
if _, err := dataFile.ReadAt(output[outputSize:outputSize+length], int64(start)); err != nil {
return err
}
outputSize += length
return nil
}
dataFile, exist := t.files[filenum]
if !exist {
return nil, fmt.Errorf("missing data file %d", filenum)
// Read all the indexes in one go
indices, err := t.getIndices(start, count)
if err != nil {
return nil, nil, err
}
// Retrieve the data itself, decompress and return
blob := make([]byte, endOffset-startOffset)
if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
return nil, err
var (
sizes []int // The sizes for each element
totalSize = 0 // The total size of all data read so far
readStart = indices[0].offset // Where, in the file, to start reading
unreadSize = 0 // The size of the as-yet-unread data
)

for i, firstIndex := range indices[:len(indices)-1] {
secondIndex := indices[i+1]
// Determine the size of the item.
offset1, offset2, _ := firstIndex.bounds(secondIndex)
size := int(offset2 - offset1)
// Crossing a file boundary?
if secondIndex.filenum != firstIndex.filenum {
// If we have unread data in the first file, we need to do that read now.
if unreadSize > 0 {
if err := readData(firstIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
unreadSize = 0
}
readStart = 0
}
if i > 0 && uint64(totalSize+size) > maxBytes {
// About to break out due to byte limit being exceeded. We don't
// read this last item, but we need to do the deferred reads now.
if unreadSize > 0 {
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
}
break
}
// Defer the read for later
unreadSize += size
totalSize += size
sizes = append(sizes, size)
if i == len(indices)-2 || uint64(totalSize) > maxBytes {
// Last item, need to do the read now
if err := readData(secondIndex.filenum, readStart, unreadSize); err != nil {
return nil, nil, err
}
break
}
}
t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize))
return blob, nil
return output[:outputSize], sizes, nil
}

// has returns an indicator whether the specified number data
Expand Down
Loading

0 comments on commit 4c04dd6

Please sign in to comment.