diff --git a/benchmark_test.go b/benchmark_test.go index 345174a..224e981 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -34,7 +34,7 @@ import ( func BenchmarkBalanceTransfer(b *testing.B) { const ( - numOfAddresses = 50_000_000 + numOfAddresses = 5_000_000 txsPerCommit = 20_000 balance = 100_000 ) diff --git a/space/address.go b/space/address.go index 397f536..db44e4a 100644 --- a/space/address.go +++ b/space/address.go @@ -1,6 +1,8 @@ package space import ( + "sync/atomic" + "github.com/outofforest/quantum/types" ) @@ -23,3 +25,11 @@ func isPointer(address types.VolatileAddress) bool { func isData(address types.VolatileAddress) bool { return !isFree(address) && !isPointer(address) } + +func load(address *types.VolatileAddress) types.VolatileAddress { + return (types.VolatileAddress)(atomic.LoadUint64((*uint64)(address))) +} + +func store(address *types.VolatileAddress, value types.VolatileAddress) { + atomic.StoreUint64((*uint64)(address), (uint64)(value)) +} diff --git a/space/space.go b/space/space.go index e9d9e13..80b07c8 100644 --- a/space/space.go +++ b/space/space.go @@ -78,7 +78,7 @@ func (s *Space[K, V]) Find(v *Entry[K, V], key K, stage uint8) { s.initEntry(v, key, keyHash, stage) } - s.find(v) + s.find(v, load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)) } // Query queries the key. @@ -93,9 +93,10 @@ func (s *Space[K, V]) Query(key K) (V, bool) { func (s *Space[K, V]) KeyExists( v *Entry[K, V], ) bool { - s.detectUpdate(v) + volatileAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + s.detectUpdate(v, volatileAddress) - s.find(v) + s.find(v, volatileAddress) return v.exists } @@ -104,9 +105,10 @@ func (s *Space[K, V]) KeyExists( func (s *Space[K, V]) ReadKey( v *Entry[K, V], ) V { - s.detectUpdate(v) + volatileAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + s.detectUpdate(v, volatileAddress) - s.find(v) + s.find(v, volatileAddress) if v.keyHashP == nil || *v.keyHashP == 0 { return s.defaultValue @@ -120,9 +122,10 @@ func (s *Space[K, V]) DeleteKey( v *Entry[K, V], tx *pipeline.TransactionRequest, ) { - s.detectUpdate(v) + volatileAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + s.detectUpdate(v, volatileAddress) - s.find(v) + s.find(v, volatileAddress) if v.keyHashP != nil && *v.keyHashP != 0 { // If we are here it means `s.find` found the slot with matching key so don't need to check hash and key again. @@ -144,9 +147,10 @@ func (s *Space[K, V]) SetKey( ) error { v.item.Value = value - s.detectUpdate(v) + volatileAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + s.detectUpdate(v, volatileAddress) - return s.set(v, tx, allocator) + return s.set(v, volatileAddress, tx, allocator) } // Stats returns space-related statistics. @@ -177,7 +181,7 @@ func (s *Space[K, V]) Stats() (uint64, uint64, uint64, uint64, float64) { pointerNodes++ pointerNode := ProjectPointerNode(s.config.State.Node(n)) for pi := range pointerNode.Pointers { - volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) + volatileAddress := pointerNode.Pointers[pi].VolatileAddress if isFree(volatileAddress) { continue } @@ -248,8 +252,8 @@ func (s *Space[K, V]) initEntry( v.stage = stage } -func (s *Space[K, V]) find(v *Entry[K, V]) { - s.walkPointers(v) +func (s *Space[K, V]) find(v *Entry[K, V], volatileAddress types.VolatileAddress) { + volatileAddress = s.walkPointers(v, volatileAddress) switch { case v.stage == StageData: @@ -261,7 +265,6 @@ func (s *Space[K, V]) find(v *Entry[K, V]) { return } - volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) if !isData(volatileAddress) { v.keyHashP = nil v.itemP = nil @@ -270,32 +273,31 @@ func (s *Space[K, V]) find(v *Entry[K, V]) { return } - s.walkDataItems(v) + s.walkDataItems(v, volatileAddress) } func (s *Space[K, V]) set( v *Entry[K, V], + volatileAddress types.VolatileAddress, tx *pipeline.TransactionRequest, volatileAllocator *alloc.Allocator[types.VolatileAddress], ) error { - s.walkPointers(v) - - volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + volatileAddress = s.walkPointers(v, volatileAddress) if isFree(volatileAddress) { - dataNodeVolatileAddress, err := volatileAllocator.Allocate() + var err error + volatileAddress, err = volatileAllocator.Allocate() if err != nil { return err } - s.config.State.Clear(dataNodeVolatileAddress) + s.config.State.Clear(volatileAddress) - types.Store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, - dataNodeVolatileAddress) + store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, volatileAddress) } // Starting from here the data node is allocated. - conflict := s.walkDataItems(v) + conflict := s.walkDataItems(v, volatileAddress) if v.keyHashP != nil { if *v.keyHashP == 0 { @@ -314,7 +316,7 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { splitDone, err := s.splitDataNodeWithoutConflict(tx, volatileAllocator, v.parentIndex, - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level) if err != nil { return err } @@ -324,16 +326,19 @@ func (s *Space[K, V]) set( v.level-- v.nextDataNode = nil - return s.set(v, tx, volatileAllocator) + volatileAddress = v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + return s.set(v, volatileAddress, tx, volatileAllocator) } } // Add pointer node. - if err := s.addPointerNode(v, tx, volatileAllocator, conflict); err != nil { + var err error + volatileAddress, err = s.addPointerNode(v, tx, volatileAllocator, conflict) + if err != nil { return err } - return s.set(v, tx, volatileAllocator) + return s.set(v, volatileAddress, tx, volatileAllocator) } func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, index uint64) (uint64, uint64) { @@ -347,7 +352,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde for mask > 1 { mask >>= 1 newIndex := index | mask - if isFree(types.Load(&parentNode.Pointers[newIndex].VolatileAddress)) { + if isFree(parentNode.Pointers[newIndex].VolatileAddress) { index = newIndex break } @@ -360,10 +365,10 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( tx *pipeline.TransactionRequest, volatileAllocator *alloc.Allocator[types.VolatileAddress], index uint64, - parentNodePointer *types.Pointer, + parentNodeAddress types.VolatileAddress, level uint8, ) (bool, error) { - newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index) + newIndex, mask := s.splitToIndex(parentNodeAddress, index) if newIndex == index { return false, nil } @@ -374,9 +379,9 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( } s.config.State.Clear(newNodeVolatileAddress) - parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress))) + parentNode := ProjectPointerNode(s.config.State.Node(parentNodeAddress)) existingNodePointer := &parentNode.Pointers[index] - existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) + existingDataNode := s.config.State.Node(existingNodePointer.VolatileAddress) newDataNode := s.config.State.Node(newNodeVolatileAddress) keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode) newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode) @@ -397,7 +402,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( keyHashes[i] = 0 } - types.Store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress) + store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress) tx.AddStoreRequest(&pipeline.StoreRequest{ Store: [pipeline.StoreCapacity]types.NodeRoot{ @@ -427,10 +432,10 @@ func (s *Space[K, V]) splitDataNodeWithConflict( tx *pipeline.TransactionRequest, volatileAllocator *alloc.Allocator[types.VolatileAddress], index uint64, - parentNodePointer *types.Pointer, + parentNodeAddress types.VolatileAddress, level uint8, ) (bool, error) { - newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index) + newIndex, mask := s.splitToIndex(parentNodeAddress, index) if newIndex == index { return false, nil } @@ -441,9 +446,9 @@ func (s *Space[K, V]) splitDataNodeWithConflict( } s.config.State.Clear(newNodeVolatileAddress) - parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress))) + parentNode := ProjectPointerNode(s.config.State.Node(parentNodeAddress)) existingNodePointer := &parentNode.Pointers[index] - existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) + existingDataNode := s.config.State.Node(existingNodePointer.VolatileAddress) newDataNode := s.config.State.Node(newNodeVolatileAddress) keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode) newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode) @@ -468,7 +473,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict( keyHashes[i] = 0 } - types.Store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress) + store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress) tx.AddStoreRequest(&pipeline.StoreRequest{ Store: [pipeline.StoreCapacity]types.NodeRoot{ @@ -499,10 +504,10 @@ func (s *Space[K, V]) addPointerNode( tx *pipeline.TransactionRequest, volatileAllocator *alloc.Allocator[types.VolatileAddress], conflict bool, -) error { +) (types.VolatileAddress, error) { pointerNodeVolatileAddress, err := volatileAllocator.Allocate() if err != nil { - return err + return 0, err } s.config.State.Clear(pointerNodeVolatileAddress) @@ -512,26 +517,29 @@ func (s *Space[K, V]) addPointerNode( pointerNode.Pointers[0].SnapshotID = dataPointer.SnapshotID pointerNode.Pointers[0].PersistentAddress = dataPointer.PersistentAddress - types.Store(&pointerNode.Pointers[0].VolatileAddress, dataPointer.VolatileAddress) + store(&pointerNode.Pointers[0].VolatileAddress, dataPointer.VolatileAddress) pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(flagPointerNode) if conflict { pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(flagHashMod) } - types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) + store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) if conflict { - _, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1) + _, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeVolatileAddress, v.level+1) } else { - _, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1) + _, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeVolatileAddress, v.level+1) + } + if err != nil { + return 0, err } - return err + return pointerNodeVolatileAddress, nil } -func (s *Space[K, V]) walkPointers(v *Entry[K, V]) { +func (s *Space[K, V]) walkPointers(v *Entry[K, V], volatileAddress types.VolatileAddress) types.VolatileAddress { if v.nextDataNode != nil { - if isFree(types.Load(v.nextDataNode)) { - return + if isFree(load(v.nextDataNode)) { + return volatileAddress } v.storeRequest.PointersToStore-- @@ -540,21 +548,24 @@ func (s *Space[K, V]) walkPointers(v *Entry[K, V]) { v.keyHashP = nil v.itemP = nil + + volatileAddress = load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) } + var stop bool for { - if !s.walkOnePointer(v) { - return + if volatileAddress, stop = s.walkOnePointer(v, volatileAddress); stop { + return volatileAddress } } } func (s *Space[K, V]) walkOnePointer( v *Entry[K, V], -) bool { - volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + volatileAddress types.VolatileAddress, +) (types.VolatileAddress, bool) { if !isPointer(volatileAddress) { - return false + return volatileAddress, true } if volatileAddress.IsSet(flagHashMod) { v.keyHash = s.hashKeyFunc(&v.item.Key, s.hashBuff, v.level) @@ -573,17 +584,17 @@ func (s *Space[K, V]) walkOnePointer( // If we are here it means that we should stop following potential next pointer nodes because we are in the // non-final branch. A better data node might be added concurrently at any time. - return false + return pointerNode.Pointers[index].VolatileAddress, true } if v.stage == StagePointer0 && v.level == 3 { - return false + return pointerNode.Pointers[index].VolatileAddress, true } - return true + return pointerNode.Pointers[index].VolatileAddress, false } -func (s *Space[K, V]) walkDataItems(v *Entry[K, V]) bool { +func (s *Space[K, V]) walkDataItems(v *Entry[K, V], dataNodeAddress types.VolatileAddress) bool { if v.keyHashP != nil { return false } @@ -591,8 +602,7 @@ func (s *Space[K, V]) walkDataItems(v *Entry[K, V]) bool { v.exists = false var conflict bool - node := s.config.State.Node(types.Load( - &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)) + node := s.config.State.Node(dataNodeAddress) keyHashes := s.config.DataNodeAssistant.KeyHashes(node) zeroIndex, numOfMatches := compare.Compare(uint64(v.keyHash), (*uint64)(&keyHashes[0]), &s.hashMatches[0], @@ -622,14 +632,14 @@ func (s *Space[K, V]) walkDataItems(v *Entry[K, V]) bool { return conflict } -func (s *Space[K, V]) detectUpdate(v *Entry[K, V]) { +func (s *Space[K, V]) detectUpdate(v *Entry[K, V], volatileAddress types.VolatileAddress) { v.stage = StageData if v.keyHashP == nil { return } - if isPointer(types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)) || + if isPointer(volatileAddress) || *s.config.DeletionCounter != v.deletionCounter || (*v.keyHashP != 0 && (*v.keyHashP != v.keyHash || v.itemP.Key != v.item.Key)) { v.keyHashP = nil @@ -796,7 +806,7 @@ var pointerHops = [NumOfPointers][]uint64{ } func reducePointerSlot(pointerNode *PointerNode, index uint64) (uint64, uint64) { - if !isFree(types.Load(&pointerNode.Pointers[index].VolatileAddress)) { + if !isFree(load(&pointerNode.Pointers[index].VolatileAddress)) { return index, index } @@ -812,7 +822,7 @@ func reducePointerSlot(pointerNode *PointerNode, index uint64) (uint64, uint64) hopIndex := (hopEnd-hopStart)/2 + hopStart newIndex := hops[hopIndex] - volatileAddress := types.Load(&pointerNode.Pointers[newIndex].VolatileAddress) + volatileAddress := load(&pointerNode.Pointers[newIndex].VolatileAddress) switch { case isFree(volatileAddress): if !dataFound { diff --git a/space/test.go b/space/test.go index 5e7f069..a4adf9d 100644 --- a/space/test.go +++ b/space/test.go @@ -98,17 +98,18 @@ func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], conflict bool) error { var err error if conflict { _, err = s.s.splitDataNodeWithConflict(s.tx, s.allocator, v.parentIndex, - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level) } else { _, err = s.s.splitDataNodeWithoutConflict(s.tx, s.allocator, v.parentIndex, - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level) } return err } // AddPointerNode adds pointer node. func (s *SpaceTest[K, V]) AddPointerNode(v *Entry[K, V], conflict bool) error { - return s.s.addPointerNode(v, s.tx, s.allocator, conflict) + _, err := s.s.addPointerNode(v, s.tx, s.allocator, conflict) + return err } // Query queries the space for a key. @@ -118,5 +119,5 @@ func (s *SpaceTest[K, V]) Query(key TestKey[K]) (V, bool) { // Find finds the location in the tree for key. func (s *SpaceTest[K, V]) Find(v *Entry[K, V]) { - s.s.find(v) + s.s.find(v, v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) } diff --git a/types/types.go b/types/types.go index 408bc81..4f2e113 100644 --- a/types/types.go +++ b/types/types.go @@ -2,7 +2,6 @@ package types import ( "math" - "sync/atomic" ) const ( @@ -53,16 +52,6 @@ func (na VolatileAddress) Set(flag VolatileAddress) VolatileAddress { return na | flag } -// Load loads node address atomically. -func Load(address *VolatileAddress) VolatileAddress { - return (VolatileAddress)(atomic.LoadUint64((*uint64)(address))) -} - -// Store stores node address atomically. -func Store(address *VolatileAddress, value VolatileAddress) { - atomic.StoreUint64((*uint64)(address), (uint64)(value)) -} - const ( numOfFlags = 2