Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse key buffers during hashing #2902

Merged
merged 33 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
15e17bf
Cleanup calculateNodeIDs
StephenButtolph Mar 31, 2024
4ea7ef0
nit
StephenButtolph Mar 31, 2024
3c78610
typo
StephenButtolph Mar 31, 2024
eaa64a0
Add initial hashing benchmark
StephenButtolph Mar 31, 2024
4dec1ab
add tests
StephenButtolph Mar 31, 2024
dfeb3fb
nits
StephenButtolph Mar 31, 2024
2cf95e5
Remove allocations for keys
StephenButtolph Mar 31, 2024
7d6ac6c
typo
StephenButtolph Mar 31, 2024
a018121
comment
StephenButtolph Apr 1, 2024
256cef1
comment
StephenButtolph Apr 1, 2024
e037d5d
Replace weighted semaphore with channel
StephenButtolph Apr 1, 2024
c4ffa4c
fix concurrency
StephenButtolph Apr 1, 2024
a75f0e8
lint
StephenButtolph Apr 1, 2024
8c555d4
wip remove key allocations
StephenButtolph Apr 1, 2024
3c9171e
Avoid duplicate bytes copy
StephenButtolph Apr 1, 2024
8e6c4e0
Avoid allocating buffer for node with no children
StephenButtolph Apr 1, 2024
cbf3a58
comments
StephenButtolph Apr 1, 2024
e566d15
nit
StephenButtolph Apr 1, 2024
e6d506b
Fix key overwrite
StephenButtolph Apr 2, 2024
8a880e4
nit
StephenButtolph Apr 2, 2024
0209ded
Address nits
StephenButtolph Apr 2, 2024
46d52bc
merged
StephenButtolph Apr 2, 2024
97109b3
nit
StephenButtolph Apr 2, 2024
9df417d
nit + comment
StephenButtolph Apr 2, 2024
1a3c9b5
nit
StephenButtolph Apr 2, 2024
dc70e38
merged
StephenButtolph Apr 2, 2024
5da2513
merged
StephenButtolph Apr 2, 2024
f6b0cb3
nits
StephenButtolph Apr 2, 2024
2f8b836
typo
StephenButtolph Apr 2, 2024
e53725f
fix lock order
StephenButtolph Apr 2, 2024
3734e44
add comment
StephenButtolph Apr 2, 2024
fbdfc20
reverse however
StephenButtolph Apr 2, 2024
84ef6ef
add comment
StephenButtolph Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions x/merkledb/bytes_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "sync"

type bytesPool struct {
slots chan struct{}
bytesLock sync.Mutex
bytes [][]byte
}

func newBytesPool(numSlots int) *bytesPool {
return &bytesPool{
slots: make(chan struct{}, numSlots),
bytes: make([][]byte, 0, numSlots),
}
}

func (p *bytesPool) Acquire() []byte {
p.slots <- struct{}{}
return p.pop()
}

func (p *bytesPool) TryAcquire() ([]byte, bool) {
select {
case p.slots <- struct{}{}:
return p.pop(), true
default:
return nil, false
}
}

func (p *bytesPool) pop() []byte {
p.bytesLock.Lock()
defer p.bytesLock.Unlock()

numBytes := len(p.bytes)
if numBytes == 0 {
return nil
}

b := p.bytes[numBytes-1]
p.bytes = p.bytes[:numBytes-1]
return b
}

func (p *bytesPool) Release(b []byte) {
// Before waking anyone waiting on a slot, return the bytes.
p.bytesLock.Lock()
p.bytes = append(p.bytes, b)
p.bytesLock.Unlock()

select {
case <-p.slots:
default:
panic("release of unacquired semaphore")
}
}
46 changes: 46 additions & 0 deletions x/merkledb/bytes_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "testing"

func Benchmark_BytesPool_Acquire(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Acquire()
}
}

func Benchmark_BytesPool_Release(b *testing.B) {
s := newBytesPool(b.N)
for i := 0; i < b.N; i++ {
s.Acquire()
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Release(nil)
}
}

func Benchmark_BytesPool_TryAcquire_Success(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}

func Benchmark_BytesPool_TryAcquire_Failure(b *testing.B) {
s := newBytesPool(1)
s.Acquire()

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}
24 changes: 12 additions & 12 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"
"golang.org/x/sync/semaphore"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -216,9 +215,10 @@ type merkleDB struct {
// Valid children of this trie.
childViews []*view

// hashNodesSema controls the number of goroutines that are created inside
// [hashChangedNode] at any given time.
hashNodesSema *semaphore.Weighted
// hashNodesKeyPool controls the number of goroutines that are created
// inside [hashChangedNode] at any given time and provides slices for the
// keys needed while hashing.
hashNodesKeyPool *bytesPool

tokenSize int
}
Expand All @@ -242,9 +242,9 @@ func newDatabase(
return nil, err
}

rootGenConcurrency := uint(runtime.NumCPU())
rootGenConcurrency := runtime.NumCPU()
if config.RootGenConcurrency != 0 {
rootGenConcurrency = config.RootGenConcurrency
rootGenConcurrency = int(config.RootGenConcurrency)
}

// Share a sync.Pool of []byte between the intermediateNodeDB and valueNodeDB
Expand All @@ -270,12 +270,12 @@ func newDatabase(
bufferPool,
metrics,
int(config.ValueNodeCacheSize)),
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
hashNodesSema: semaphore.NewWeighted(int64(rootGenConcurrency)),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
hashNodesKeyPool: newBytesPool(rootGenConcurrency),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
}

if err := trieDB.initializeRoot(); err != nil {
Expand Down
131 changes: 88 additions & 43 deletions x/merkledb/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,31 +281,31 @@ func (v *view) hashChangedNodes(ctx context.Context) {
return
}

_ = v.db.hashNodesSema.Acquire(context.Background(), 1)
defer v.db.hashNodesSema.Release(1)
// If there are no children, we can avoid allocating [keyBuffer].
root := v.root.Value()
if len(root.children) == 0 {
v.changes.rootID = root.calculateID(v.db.metrics)
return
}

v.changes.rootID = v.hashChangedNode(v.root.Value())
// Allocate [keyBuffer] and populate it with the root node's key.
keyBuffer := v.db.hashNodesKeyPool.Acquire()
keyBuffer = v.setKeyBuffer(root, keyBuffer)
v.changes.rootID, keyBuffer = v.hashChangedNode(root, keyBuffer)
v.db.hashNodesKeyPool.Release(keyBuffer)
danlaine marked this conversation as resolved.
Show resolved Hide resolved
}

// Calculates the ID of all descendants of [n] which need to be recalculated,
// and then calculates the ID of [n] itself.
func (v *view) hashChangedNode(n *node) ids.ID {
// If there are no children, we can avoid allocating [keyBuffer].
if len(n.children) == 0 {
return n.calculateID(v.db.metrics)
}

// Calculate the size of the largest child key of this node. This allows
// only allocating a single slice for all of the keys.
var maxChildBitLength int
for _, childEntry := range n.children {
maxChildBitLength = max(maxChildBitLength, childEntry.compressedKey.length)
}

//
// Returns a potentially expanded [keyBuffer]. By returning this value this
// function is able to have a maximum total number of allocations shared across
// multiple invocations.
//
// Invariant: [keyBuffer] must be populated with [n]'s key and have sufficient
// length to contain any of [n]'s child keys.
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
func (v *view) hashChangedNode(n *node, keyBuffer []byte) (ids.ID, []byte) {
var (
maxBytesNeeded = bytesNeeded(n.key.length + v.tokenSize + maxChildBitLength)
// keyBuffer is allocated onto the heap because it is dynamically sized.
keyBuffer = make([]byte, maxBytesNeeded)
// childBuffer is allocated on the stack.
childBuffer = make([]byte, 1)
dualIndex = dualBitIndex(v.tokenSize)
Expand All @@ -318,14 +318,13 @@ func (v *view) hashChangedNode(n *node) ids.ID {
// We use [wg] to wait until all descendants of [n] have been updated.
wg waitGroup
)

if bytesForKey > 0 {
// We only need to copy this node's key once because it does not change
// as we iterate over the children.
copy(keyBuffer, n.key.value)
lastKeyByte = keyBuffer[bytesForKey-1]
}

// This loop is optimized to avoid allocations when calculating the
// [childKey] by reusing [keyBuffer] and leaving the first [bytesForKey-1]
// bytes unmodified.
for childIndex, childEntry := range n.children {
childBuffer[0] = childIndex << dualIndex
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
childIndexAsKey := Key{
Expand All @@ -336,17 +335,26 @@ func (v *view) hashChangedNode(n *node) ids.ID {
}

totalBitLength := n.key.length + v.tokenSize + childEntry.compressedKey.length
buffer := keyBuffer[:bytesNeeded(totalBitLength)]
// Make sure the last byte of the key is originally set correctly
// Because [keyBuffer] may have been modified in a prior iteration of
// this loop, it is not guaranteed that its length is at least
// [bytesNeeded(totalBitLength)]. However, that's fine. The below
// slicing would only panic if the buffer didn't have sufficient
// capacity.
keyBuffer = keyBuffer[:bytesNeeded(totalBitLength)]
// We don't need to copy this node's key. It's assumed to already be
// correct; except for the last byte. We must make sure the last byte of
// the key is set correctly because extendIntoBuffer may OR bits from
// the extension and overwrite the last byte. However, extendIntoBuffer
// does not modify the first [bytesForKey-1] bytes of [keyBuffer].
if bytesForKey > 0 {
buffer[bytesForKey-1] = lastKeyByte
keyBuffer[bytesForKey-1] = lastKeyByte
}
extendIntoBuffer(buffer, childIndexAsKey, n.key.length)
extendIntoBuffer(buffer, childEntry.compressedKey, n.key.length+v.tokenSize)
extendIntoBuffer(keyBuffer, childIndexAsKey, n.key.length)
extendIntoBuffer(keyBuffer, childEntry.compressedKey, n.key.length+v.tokenSize)
childKey := Key{
// It is safe to use byteSliceToString because [buffer] is not
// It is safe to use byteSliceToString because [keyBuffer] is not
// modified while [childKey] is in use.
value: byteSliceToString(buffer),
value: byteSliceToString(keyBuffer),
length: totalBitLength,
}

Expand All @@ -355,32 +363,69 @@ func (v *view) hashChangedNode(n *node) ids.ID {
// This child wasn't changed.
continue
}
childEntry.hasValue = childNodeChange.after.hasValue()

childNode := childNodeChange.after
childEntry.hasValue = childNode.hasValue()

// If there are no children of the childNode, we can avoid constructing
// the buffer for the child keys.
if len(childNode.children) == 0 {
childEntry.id = childNode.calculateID(v.db.metrics)
continue
}

// Try updating the child and its descendants in a goroutine.
if ok := v.db.hashNodesSema.TryAcquire(1); ok {
if childKeyBuffer, ok := v.db.hashNodesKeyPool.TryAcquire(); ok {
wg.Add(1)

// Passing variables explicitly through the function call rather
// than implicitly passing them through the scope of the function
// definition allows the passed variables to be allocated on the
// stack.
go func(wg *sync.WaitGroup, childEntry *child) {
childEntry.id = v.hashChangedNode(childNodeChange.after)
v.db.hashNodesSema.Release(1)
go func(wg *sync.WaitGroup, childEntry *child, childNode *node, childKeyBuffer []byte) {
childKeyBuffer = v.setKeyBuffer(childNode, childKeyBuffer)
childEntry.id, childKeyBuffer = v.hashChangedNode(childNode, childKeyBuffer)
v.db.hashNodesKeyPool.Release(childKeyBuffer)
wg.Done()
}(wg.wg, childEntry)
}(wg.wg, childEntry, childNode, childKeyBuffer)
} else {
// We're at the goroutine limit; do the work in this goroutine.
childEntry.id = v.hashChangedNode(childNodeChange.after)
//
// We can skip copying the key here because [keyBuffer] is already
// constructed to be childNode's key.
keyBuffer = v.setLengthForChildren(childNode, keyBuffer)
childEntry.id, keyBuffer = v.hashChangedNode(childNode, keyBuffer)
}
}

// Wait until all descendants of [n] have been updated.
wg.Wait()

// The IDs [n]'s descendants are up to date so we can calculate [n]'s ID.
return n.calculateID(v.db.metrics)
return n.calculateID(v.db.metrics), keyBuffer
}

// setKeyBuffer expands [keyBuffer] to have sufficient size for any of [n]'s
// child keys and populates [n]'s key into [keyBuffer]. If [keyBuffer] already
// has sufficient size, this function will not perform any memory allocations.
func (v *view) setKeyBuffer(n *node, keyBuffer []byte) []byte {
keyBuffer = v.setLengthForChildren(n, keyBuffer)
copy(keyBuffer, n.key.value)
return keyBuffer
}

// setLengthForChildren expands [keyBuffer] to have sufficient size for any of
// [n]'s child keys.
func (v *view) setLengthForChildren(n *node, keyBuffer []byte) []byte {
// Calculate the size of the largest child key of this node.
var maxBitLength int
for _, childEntry := range n.children {
maxBitLength = max(maxBitLength, childEntry.compressedKey.length)
}
maxBytesNeeded := bytesNeeded(n.key.length + v.tokenSize + maxBitLength)
return setBytesLength(keyBuffer, maxBytesNeeded)
}

func setBytesLength(b []byte, size int) []byte {
if size <= cap(b) {
return b[:size]
}
return append(b[:cap(b)], make([]byte, size-cap(b))...)
}

// GetProof returns a proof that [bytesPath] is in or not in trie [t].
Expand Down
Loading