From ee6e6632282d336bbc9a677cd59034568b03cfe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ma=C5=82ota-W=C3=B3jcik?= <59281144+outofforest@users.noreply.github.com> Date: Fri, 13 Dec 2024 06:51:44 +0100 Subject: [PATCH] Don't store address of the space, it might change over time (#298) --- alloc/allocator.go | 5 ++- alloc/mem.go | 2 ++ alloc/state.go | 2 +- benchmark_test.go | 13 +++---- db.go | 62 ++++++++++++++++---------------- pipeline/pipeline.go | 1 - space/space.go | 84 ++++++++++++++++++++++---------------------- types/types.go | 9 ++--- 8 files changed, 90 insertions(+), 88 deletions(-) diff --git a/alloc/allocator.go b/alloc/allocator.go index 7faf61a..a92a4df 100644 --- a/alloc/allocator.go +++ b/alloc/allocator.go @@ -98,7 +98,10 @@ type Deallocator[A Address] struct { // Deallocate deallocates single node. func (d *Deallocator[A]) Deallocate(nodeAddress A) { - d.release = append(d.release, nodeAddress) + // ANDing with FlagNaked is done to erase flags from volatile addresses. + // Persistent addresses don't have flags, but it is impossible to have so big values there anyway. + d.release = append(d.release, nodeAddress&types.FlagNaked) + if len(d.release) == cap(d.release) { d.sinkCh <- d.release diff --git a/alloc/mem.go b/alloc/mem.go index f9530b4..6d3295c 100644 --- a/alloc/mem.go +++ b/alloc/mem.go @@ -43,6 +43,8 @@ func Allocate(size, alignment uint64, useHugePages bool) (unsafe.Pointer, func() // 1GB hugepages. _ = unmap(dataPOrig, allocatedSize, 1024*1024*1024) + + return } // Standard pages. diff --git a/alloc/state.go b/alloc/state.go index 84165b6..bb2b269 100644 --- a/alloc/state.go +++ b/alloc/state.go @@ -102,7 +102,7 @@ func (s *State) VolatileSize() uint64 { // Node returns node bytes. func (s *State) Node(nodeAddress types.VolatileAddress) unsafe.Pointer { - return unsafe.Add(s.dataP, nodeAddress.Naked()*types.NodeLength) + return unsafe.Add(s.dataP, nodeAddress*types.NodeLength) } // Bytes returns byte slice of a node. diff --git a/benchmark_test.go b/benchmark_test.go index f111350..cbfa8fd 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -20,6 +20,7 @@ import ( "github.com/outofforest/quantum/tx/genesis" "github.com/outofforest/quantum/tx/transfer" txtypes "github.com/outofforest/quantum/tx/types" + "github.com/outofforest/quantum/tx/types/spaces" "github.com/outofforest/quantum/types" ) @@ -28,9 +29,11 @@ import ( // go tool pprof -http="localhost:8000" pprofbin ./profile.out // go test -c -o bench ./benchmark_test.go +// * soft memlock unlimited +// * hard memlock unlimited + func BenchmarkBalanceTransfer(b *testing.B) { const ( - spaceID = 0x00 numOfAddresses = 5_000_000 txsPerCommit = 20_000 balance = 100_000 @@ -66,7 +69,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { panic(err) } - var size uint64 = 5 * 1024 * 1024 * 1024 + var size uint64 = 1 * 1024 * 1024 * 1024 state, stateDeallocFunc, err := alloc.NewState( size, store.Size(), 100, @@ -95,11 +98,9 @@ func BenchmarkBalanceTransfer(b *testing.B) { } }() - defer func() { - db.Close() - }() + defer db.Close() - s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaceID, db) + s, err := quantum.GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db) if err != nil { panic(err) } diff --git a/db.go b/db.go index 163b999..566e845 100644 --- a/db.go +++ b/db.go @@ -253,36 +253,34 @@ func (db *DB) deleteSnapshot( }, ) - if nextSnapshotInfo.DeallocationRoot.VolatileAddress != types.FreeAddress { - for nextDeallocSnapshot := range space.IteratorAndDeallocator( - nextSnapshotInfo.DeallocationRoot, - db.config.State, - deallocationNodeAssistant, - volatileDeallocator, - persistentDeallocator, - ) { - if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID && - nextDeallocSnapshot.Key.SnapshotID <= snapshotID { - if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator, - persistentDeallocator); err != nil { - return err - } - - continue - } - - var deallocationListValue space.Entry[deallocationKey, list.Pointer] - deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff, - deallocationHashMatches) - if err := deallocationListValue.Set( - tx, - volatileAllocator, - nextDeallocSnapshot.Value, - deallocationHashBuff, - deallocationHashMatches, - ); err != nil { + for nextDeallocSnapshot := range space.IteratorAndDeallocator( + nextSnapshotInfo.DeallocationRoot, + db.config.State, + deallocationNodeAssistant, + volatileDeallocator, + persistentDeallocator, + ) { + if nextDeallocSnapshot.Key.SnapshotID > snapshotInfo.PreviousSnapshotID && + nextDeallocSnapshot.Key.SnapshotID <= snapshotID { + if err := list.Deallocate(nextDeallocSnapshot.Value, db.config.State, volatileDeallocator, + persistentDeallocator); err != nil { return err } + + continue + } + + var deallocationListValue space.Entry[deallocationKey, list.Pointer] + deallocationLists.Find(&deallocationListValue, nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff, + deallocationHashMatches) + if err := deallocationListValue.Set( + tx, + volatileAllocator, + nextDeallocSnapshot.Value, + deallocationHashBuff, + deallocationHashMatches, + ); err != nil { + return err } } @@ -571,7 +569,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read if root.Pointer.SnapshotID != db.singularityNode.LastSnapshotID { if root.Pointer.PersistentAddress != 0 { listNodePointer, err := db.deallocateNode(root.Pointer.SnapshotID, root.Pointer.PersistentAddress, - deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator) + deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator, + sr.NoSnapshots) if err != nil { return err } @@ -859,7 +858,7 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error } for sr := req.StoreRequest; sr != nil; sr = sr.Next { - for i := sr.PointersToStore - 1; i >= sr.PointersLast; i-- { + for i := sr.PointersToStore - 1; i >= 0; i-- { pointer := sr.Store[i].Pointer if pointer.Revision == uintptr(unsafe.Pointer(sr)) { @@ -885,10 +884,11 @@ func (db *DB) deallocateNode( volatileAllocator *alloc.Allocator[types.VolatileAddress], persistentAllocator *alloc.Allocator[types.PersistentAddress], persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + immediateDeallocation bool, ) (list.Pointer, error) { // Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one. // It means the condition here don't need to include snapshot IDs greater than the previous snapshot ID. - if nodeSnapshotID == db.singularityNode.LastSnapshotID { + if nodeSnapshotID == db.singularityNode.LastSnapshotID || immediateDeallocation { persistentDeallocator.Deallocate(nodeAddress) return list.Pointer{}, nil diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index e65f9e2..f611ccd 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -82,7 +82,6 @@ type StoreRequest struct { NoSnapshots bool PointersToStore int8 Store [StoreCapacity]types.NodeRoot - PointersLast int8 Next *StoreRequest } diff --git a/space/space.go b/space/space.go index 5526d23..31ca7ea 100644 --- a/space/space.go +++ b/space/space.go @@ -39,7 +39,6 @@ func New[K, V comparable](config Config[K, V]) *Space[K, V] { } defaultInit := Entry[K, V]{ - space: s, storeRequest: pipeline.StoreRequest{ NoSnapshots: s.config.NoSnapshots, PointersToStore: 1, @@ -98,48 +97,48 @@ func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bo } // Stats returns space-related statistics. -func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) { - switch { - case isFree(s.config.SpaceRoot.Pointer.VolatileAddress): - return 0, 0, 0, 0 - case !isPointer(s.config.SpaceRoot.Pointer.VolatileAddress): - return 1, 0, 1, 0 +func (s *Space[K, V]) Stats() (uint64, uint64, uint64, uint64, float64) { + if isFree(s.config.SpaceRoot.Pointer.VolatileAddress) { + return 0, 0, 0, 0, 0 } stack := []types.VolatileAddress{s.config.SpaceRoot.Pointer.VolatileAddress} levels := map[types.VolatileAddress]uint64{ - s.config.SpaceRoot.Pointer.VolatileAddress: 1, + s.config.SpaceRoot.Pointer.VolatileAddress: 0, } var maxLevel, pointerNodes, dataNodes, dataItems uint64 for { if len(stack) == 0 { - return maxLevel, pointerNodes, dataNodes, float64(dataItems) / float64(dataNodes*s.numOfDataItems) + return maxLevel, pointerNodes, dataNodes, dataItems, float64(dataItems) / float64(dataNodes*s.numOfDataItems) } n := stack[len(stack)-1] level := levels[n] + 1 - pointerNodes++ stack = stack[:len(stack)-1] - pointerNode := ProjectPointerNode(s.config.State.Node(n)) - for pi := range pointerNode.Pointers { - volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) - switch { - case isFree(volatileAddress): - case isPointer(volatileAddress): + switch { + case isPointer(n): + pointerNodes++ + pointerNode := ProjectPointerNode(s.config.State.Node(n)) + for pi := range pointerNode.Pointers { + volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) + if isFree(volatileAddress) { + continue + } + stack = append(stack, volatileAddress) levels[volatileAddress] = level - default: - dataNodes++ - if level > maxLevel { - maxLevel = level - } - for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(volatileAddress)) { - if kh != 0 { - dataItems++ - } + } + case !isFree(n): + dataNodes++ + if level > maxLevel { + maxLevel = level + } + for _, kh := range s.config.DataNodeAssistant.KeyHashes(s.config.State.Node(n)) { + if kh != 0 { + dataItems++ } } } @@ -196,6 +195,7 @@ func (s *Space[K, V]) initEntry( ) { initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize) copy(initBytes, s.defaultInit) + v.space = s v.keyHash = keyHash v.item.Key = key v.stage = stage @@ -302,7 +302,7 @@ func (s *Space[K, V]) find( func (s *Space[K, V]) set( v *Entry[K, V], tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *alloc.Allocator[types.VolatileAddress], hashBuff []byte, hashMatches []uint64, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, @@ -312,7 +312,7 @@ func (s *Space[K, V]) set( volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) if isFree(volatileAddress) { - dataNodeVolatileAddress, err := allocator.Allocate() + dataNodeVolatileAddress, err := volatileAllocator.Allocate() if err != nil { return err } @@ -342,7 +342,7 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { - splitDone, err := s.splitDataNodeWithoutConflict(tx, allocator, v.parentIndex, + splitDone, err := s.splitDataNodeWithoutConflict(tx, volatileAllocator, v.parentIndex, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) if err != nil { return err @@ -353,16 +353,16 @@ func (s *Space[K, V]) set( v.level-- v.nextDataNode = nil - return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc) + return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc) } } // Add pointer node. - if err := s.addPointerNode(v, tx, allocator, conflict, hashBuff, hashKeyFunc); err != nil { + if err := s.addPointerNode(v, tx, volatileAllocator, conflict, hashBuff, hashKeyFunc); err != nil { return err } - return s.set(v, tx, allocator, hashBuff, hashMatches, hashKeyFunc) + return s.set(v, tx, volatileAllocator, hashBuff, hashMatches, hashKeyFunc) } func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, index uint64) (uint64, uint64) { @@ -387,7 +387,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde func (s *Space[K, V]) splitDataNodeWithoutConflict( tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *alloc.Allocator[types.VolatileAddress], index uint64, parentNodePointer *types.Pointer, level uint8, @@ -397,7 +397,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( return false, nil } - newNodeVolatileAddress, err := allocator.Allocate() + newNodeVolatileAddress, err := volatileAllocator.Allocate() if err != nil { return false, err } @@ -454,7 +454,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( func (s *Space[K, V]) splitDataNodeWithConflict( tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *alloc.Allocator[types.VolatileAddress], index uint64, parentNodePointer *types.Pointer, level uint8, @@ -466,7 +466,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict( return false, nil } - newNodeVolatileAddress, err := allocator.Allocate() + newNodeVolatileAddress, err := volatileAllocator.Allocate() if err != nil { return false, err } @@ -528,12 +528,12 @@ func (s *Space[K, V]) splitDataNodeWithConflict( func (s *Space[K, V]) addPointerNode( v *Entry[K, V], tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *alloc.Allocator[types.VolatileAddress], conflict bool, hashBuff []byte, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) error { - pointerNodeVolatileAddress, err := allocator.Allocate() + pointerNodeVolatileAddress, err := volatileAllocator.Allocate() if err != nil { return err } @@ -554,10 +554,10 @@ func (s *Space[K, V]) addPointerNode( types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) if conflict { - _, err = s.splitDataNodeWithConflict(tx, allocator, 0, pointerNodeRoot.Pointer, + _, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1, hashBuff, hashKeyFunc) } else { - _, err = s.splitDataNodeWithoutConflict(tx, allocator, 0, pointerNodeRoot.Pointer, v.level+1) + _, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1) } return err } @@ -679,9 +679,9 @@ func (s *Space[K, V]) detectUpdate(v *Entry[K, V]) { // Entry represents entry in the space. type Entry[K, V comparable] struct { - space *Space[K, V] storeRequest pipeline.StoreRequest + space *Space[K, V] itemP *DataItem[K, V] keyHashP *types.KeyHash keyHash types.KeyHash @@ -763,8 +763,8 @@ func IteratorAndDeallocator[K, V comparable]( stackCount-- pointer := stack[stackCount] - // It is safe to do deallocations here because nodes are not reallocated until commit is finalized. - volatileDeallocator.Deallocate(pointer.VolatileAddress.Naked()) + // It is safe to deallocate here because nodes are not reallocated until commit is finalized. + volatileDeallocator.Deallocate(pointer.VolatileAddress) persistentDeallocator.Deallocate(pointer.PersistentAddress) switch { diff --git a/types/types.go b/types/types.go index 403a6a1..408bc81 100644 --- a/types/types.go +++ b/types/types.go @@ -53,11 +53,6 @@ func (na VolatileAddress) Set(flag VolatileAddress) VolatileAddress { return na | flag } -// Naked returns address without flags. -func (na VolatileAddress) Naked() VolatileAddress { - return na & FlagNaked -} - // Load loads node address atomically. func Load(address *VolatileAddress) VolatileAddress { return (VolatileAddress)(atomic.LoadUint64((*uint64)(address))) @@ -75,7 +70,9 @@ const ( FreeAddress VolatileAddress = 0 // FlagNaked is used to retrieve address without flags. - FlagNaked VolatileAddress = math.MaxUint64 >> numOfFlags + // Funny fact is that in most of the cases flags are erased automatically by multiplying address by the NodeLength, + // which causes bit shifts and as a result flags go out of the scope of uint64. + FlagNaked = math.MaxUint64 >> numOfFlags ) // Pointer is the pointer to another block.