Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse key buffers during hashing #2902

Merged
merged 33 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
15e17bf
Cleanup calculateNodeIDs
StephenButtolph Mar 31, 2024
4ea7ef0
nit
StephenButtolph Mar 31, 2024
3c78610
typo
StephenButtolph Mar 31, 2024
eaa64a0
Add initial hashing benchmark
StephenButtolph Mar 31, 2024
4dec1ab
add tests
StephenButtolph Mar 31, 2024
dfeb3fb
nits
StephenButtolph Mar 31, 2024
2cf95e5
Remove allocations for keys
StephenButtolph Mar 31, 2024
7d6ac6c
typo
StephenButtolph Mar 31, 2024
a018121
comment
StephenButtolph Apr 1, 2024
256cef1
comment
StephenButtolph Apr 1, 2024
e037d5d
Replace weighted semaphore with channel
StephenButtolph Apr 1, 2024
c4ffa4c
fix concurrency
StephenButtolph Apr 1, 2024
a75f0e8
lint
StephenButtolph Apr 1, 2024
8c555d4
wip remove key allocations
StephenButtolph Apr 1, 2024
3c9171e
Avoid duplicate bytes copy
StephenButtolph Apr 1, 2024
8e6c4e0
Avoid allocating buffer for node with no children
StephenButtolph Apr 1, 2024
cbf3a58
comments
StephenButtolph Apr 1, 2024
e566d15
nit
StephenButtolph Apr 1, 2024
e6d506b
Fix key overwrite
StephenButtolph Apr 2, 2024
8a880e4
nit
StephenButtolph Apr 2, 2024
0209ded
Address nits
StephenButtolph Apr 2, 2024
46d52bc
merged
StephenButtolph Apr 2, 2024
97109b3
nit
StephenButtolph Apr 2, 2024
9df417d
nit + comment
StephenButtolph Apr 2, 2024
1a3c9b5
nit
StephenButtolph Apr 2, 2024
dc70e38
merged
StephenButtolph Apr 2, 2024
5da2513
merged
StephenButtolph Apr 2, 2024
f6b0cb3
nits
StephenButtolph Apr 2, 2024
2f8b836
typo
StephenButtolph Apr 2, 2024
e53725f
fix lock order
StephenButtolph Apr 2, 2024
3734e44
add comment
StephenButtolph Apr 2, 2024
fbdfc20
reverse however
StephenButtolph Apr 2, 2024
84ef6ef
add comment
StephenButtolph Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions x/merkledb/bytes_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "sync"

type bytesPool struct {
slots chan struct{}
bytesLock sync.Mutex
bytes [][]byte
}

func newBytesPool(numSlots int) *bytesPool {
return &bytesPool{
slots: make(chan struct{}, numSlots),
bytes: make([][]byte, 0, numSlots),
}
}

func (p *bytesPool) Acquire() []byte {
p.slots <- struct{}{}
return p.pop()
}

func (p *bytesPool) TryAcquire() ([]byte, bool) {
select {
case p.slots <- struct{}{}:
return p.pop(), true
default:
return nil, false
}
}

func (p *bytesPool) pop() []byte {
p.bytesLock.Lock()
defer p.bytesLock.Unlock()

numBytes := len(p.bytes)
if numBytes == 0 {
return nil
}

b := p.bytes[numBytes-1]
p.bytes = p.bytes[:numBytes-1]
return b
}

func (p *bytesPool) Release(b []byte) {
select {
case <-p.slots:
default:
panic("release of unacquired semaphore")
}

p.bytesLock.Lock()
defer p.bytesLock.Unlock()

p.bytes = append(p.bytes, b)
}

func setLength(b []byte, size int) []byte {
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
if size <= cap(b) {
return b[:size]
}
return append(b[:cap(b)], make([]byte, size-cap(b))...)
}
46 changes: 46 additions & 0 deletions x/merkledb/bytes_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package merkledb

import "testing"

func Benchmark_BytesPool_Acquire(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Acquire()
}
}

func Benchmark_BytesPool_Release(b *testing.B) {
s := newBytesPool(b.N)
for i := 0; i < b.N; i++ {
s.Acquire()
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Release(nil)
}
}

func Benchmark_BytesPool_TryAcquire_Success(b *testing.B) {
s := newBytesPool(b.N)

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}

func Benchmark_BytesPool_TryAcquire_Failure(b *testing.B) {
s := newBytesPool(1)
s.Acquire()

b.ResetTimer()
for i := 0; i < b.N; i++ {
s.TryAcquire()
}
}
24 changes: 12 additions & 12 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"
"golang.org/x/sync/semaphore"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -216,9 +215,10 @@ type merkleDB struct {
// Valid children of this trie.
childViews []*view

// calculateNodeIDsSema controls the number of goroutines inside
// [calculateNodeIDsHelper] at any given time.
calculateNodeIDsSema *semaphore.Weighted
// hashNodesKeyPool controls the number of goroutines that are created
// inside [hashChangedNode] at any given time and provides slices for the
// keys needed while hashing.
hashNodesKeyPool *bytesPool

tokenSize int
}
Expand All @@ -242,9 +242,9 @@ func newDatabase(
return nil, err
}

rootGenConcurrency := uint(runtime.NumCPU())
rootGenConcurrency := runtime.NumCPU()
if config.RootGenConcurrency != 0 {
rootGenConcurrency = config.RootGenConcurrency
rootGenConcurrency = int(config.RootGenConcurrency)
}

// Share a sync.Pool of []byte between the intermediateNodeDB and valueNodeDB
Expand All @@ -270,12 +270,12 @@ func newDatabase(
bufferPool,
metrics,
int(config.ValueNodeCacheSize)),
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
calculateNodeIDsSema: semaphore.NewWeighted(int64(rootGenConcurrency)),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
history: newTrieHistory(int(config.HistoryLength)),
debugTracer: getTracerIfEnabled(config.TraceLevel, DebugTrace, config.Tracer),
infoTracer: getTracerIfEnabled(config.TraceLevel, InfoTrace, config.Tracer),
childViews: make([]*view, 0, defaultPreallocationSize),
hashNodesKeyPool: newBytesPool(rootGenConcurrency),
tokenSize: BranchFactorToTokenSize[config.BranchFactor],
}

if err := trieDB.initializeRoot(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x/merkledb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type changeSummary struct {
// The ID of the trie after these changes.
rootID ids.ID
// The root before/after this change.
// Set in [calculateNodeIDs].
// Set in [applyValueChanges].
rootChange change[maybe.Maybe[*node]]
nodes map[Key]*change[*node]
values map[Key]*change[maybe.Maybe[[]byte]]
Expand Down
16 changes: 8 additions & 8 deletions x/merkledb/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func getNodeValue(t Trie, key string) ([]byte, error) {
path := ToKey([]byte(key))
if asView, ok := t.(*view); ok {
if err := asView.calculateNodeIDs(context.Background()); err != nil {
if err := asView.applyValueChanges(context.Background()); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestVisitPathToKey(t *testing.T) {
require.NoError(err)
require.IsType(&view{}, trieIntf)
trie = trieIntf.(*view)
require.NoError(trie.calculateNodeIDs(context.Background()))
require.NoError(trie.applyValueChanges(context.Background()))

nodePath = make([]*node, 0, 1)
require.NoError(visitPathToKey(trie, ToKey(key1), func(n *node) error {
Expand All @@ -156,7 +156,7 @@ func TestVisitPathToKey(t *testing.T) {
require.NoError(err)
require.IsType(&view{}, trieIntf)
trie = trieIntf.(*view)
require.NoError(trie.calculateNodeIDs(context.Background()))
require.NoError(trie.applyValueChanges(context.Background()))

nodePath = make([]*node, 0, 2)
require.NoError(visitPathToKey(trie, ToKey(key2), func(n *node) error {
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestVisitPathToKey(t *testing.T) {
require.NoError(err)
require.IsType(&view{}, trieIntf)
trie = trieIntf.(*view)
require.NoError(trie.calculateNodeIDs(context.Background()))
require.NoError(trie.applyValueChanges(context.Background()))

// Trie is:
// []
Expand Down Expand Up @@ -775,7 +775,7 @@ func Test_Trie_ChainDeletion(t *testing.T) {
)
require.NoError(err)

require.NoError(newTrie.(*view).calculateNodeIDs(context.Background()))
require.NoError(newTrie.(*view).applyValueChanges(context.Background()))
maybeRoot := newTrie.getRoot()
require.NoError(err)
require.True(maybeRoot.HasValue())
Expand All @@ -794,7 +794,7 @@ func Test_Trie_ChainDeletion(t *testing.T) {
},
)
require.NoError(err)
require.NoError(newTrie.(*view).calculateNodeIDs(context.Background()))
require.NoError(newTrie.(*view).applyValueChanges(context.Background()))

// trie should be empty
root := newTrie.getRoot()
Expand Down Expand Up @@ -861,7 +861,7 @@ func Test_Trie_NodeCollapse(t *testing.T) {
)
require.NoError(err)

require.NoError(trie.(*view).calculateNodeIDs(context.Background()))
require.NoError(trie.(*view).applyValueChanges(context.Background()))

for _, kv := range kvs {
node, err := trie.getEditableNode(ToKey(kv.Key), true)
Expand All @@ -888,7 +888,7 @@ func Test_Trie_NodeCollapse(t *testing.T) {
)
require.NoError(err)

require.NoError(trie.(*view).calculateNodeIDs(context.Background()))
require.NoError(trie.(*view).applyValueChanges(context.Background()))

for _, kv := range deletedKVs {
_, err := trie.getEditableNode(ToKey(kv.Key), true)
Expand Down
Loading
Loading