Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest committed Dec 13, 2024
1 parent 2d8f202 commit 4e4d4af
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 58 deletions.
10 changes: 10 additions & 0 deletions space/address.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package space

import (
"sync/atomic"

"github.com/outofforest/quantum/types"
)

Expand All @@ -23,3 +25,11 @@ func isPointer(address types.VolatileAddress) bool {
func isData(address types.VolatileAddress) bool {
return !isFree(address) && !isPointer(address)
}

func load(address *types.VolatileAddress) types.VolatileAddress {
return (types.VolatileAddress)(atomic.LoadUint64((*uint64)(address)))
}

func store(address *types.VolatileAddress, value types.VolatileAddress) {
atomic.StoreUint64((*uint64)(address), (uint64)(value))
}
88 changes: 43 additions & 45 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *Space[K, V]) Stats() (uint64, uint64, uint64, uint64, float64) {
pointerNodes++
pointerNode := ProjectPointerNode(s.config.State.Node(n))
for pi := range pointerNode.Pointers {
volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress)
volatileAddress := pointerNode.Pointers[pi].VolatileAddress
if isFree(volatileAddress) {
continue
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func (s *Space[K, V]) initEntry(
}

func (s *Space[K, V]) find(v *Entry[K, V]) {
s.walkPointers(v)
volatileAddress := s.walkPointers(v)

switch {
case v.stage == StageData:
Expand All @@ -261,7 +261,6 @@ func (s *Space[K, V]) find(v *Entry[K, V]) {
return
}

volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
if !isData(volatileAddress) {
v.keyHashP = nil
v.itemP = nil
Expand All @@ -270,32 +269,30 @@ func (s *Space[K, V]) find(v *Entry[K, V]) {
return
}

s.walkDataItems(v)
s.walkDataItems(v, volatileAddress)
}

func (s *Space[K, V]) set(
v *Entry[K, V],
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
) error {
s.walkPointers(v)

volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
volatileAddress := s.walkPointers(v)

if isFree(volatileAddress) {
dataNodeVolatileAddress, err := volatileAllocator.Allocate()
var err error
volatileAddress, err = volatileAllocator.Allocate()
if err != nil {
return err
}
s.config.State.Clear(dataNodeVolatileAddress)
s.config.State.Clear(volatileAddress)

types.Store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress,
dataNodeVolatileAddress)
store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, volatileAddress)
}

// Starting from here the data node is allocated.

conflict := s.walkDataItems(v)
conflict := s.walkDataItems(v, volatileAddress)

if v.keyHashP != nil {
if *v.keyHashP == 0 {
Expand All @@ -314,7 +311,7 @@ func (s *Space[K, V]) set(
// Try to split data node.
if v.storeRequest.PointersToStore > 1 {
splitDone, err := s.splitDataNodeWithoutConflict(tx, volatileAllocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level)
if err != nil {
return err
}
Expand Down Expand Up @@ -347,7 +344,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde
for mask > 1 {
mask >>= 1
newIndex := index | mask
if isFree(types.Load(&parentNode.Pointers[newIndex].VolatileAddress)) {
if isFree(parentNode.Pointers[newIndex].VolatileAddress) {
index = newIndex
break
}
Expand All @@ -360,10 +357,10 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
parentNodeAddress types.VolatileAddress,
level uint8,
) (bool, error) {
newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index)
newIndex, mask := s.splitToIndex(parentNodeAddress, index)
if newIndex == index {
return false, nil
}
Expand All @@ -374,9 +371,9 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(
}
s.config.State.Clear(newNodeVolatileAddress)

parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress)))
parentNode := ProjectPointerNode(s.config.State.Node(parentNodeAddress))
existingNodePointer := &parentNode.Pointers[index]
existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress))
existingDataNode := s.config.State.Node(existingNodePointer.VolatileAddress)
newDataNode := s.config.State.Node(newNodeVolatileAddress)
keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode)
newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode)
Expand All @@ -397,7 +394,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict(
keyHashes[i] = 0
}

types.Store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress)
store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress)

tx.AddStoreRequest(&pipeline.StoreRequest{
Store: [pipeline.StoreCapacity]types.NodeRoot{
Expand Down Expand Up @@ -427,10 +424,10 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
tx *pipeline.TransactionRequest,
volatileAllocator *alloc.Allocator[types.VolatileAddress],
index uint64,
parentNodePointer *types.Pointer,
parentNodeAddress types.VolatileAddress,
level uint8,
) (bool, error) {
newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index)
newIndex, mask := s.splitToIndex(parentNodeAddress, index)
if newIndex == index {
return false, nil
}
Expand All @@ -441,9 +438,9 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
}
s.config.State.Clear(newNodeVolatileAddress)

parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress)))
parentNode := ProjectPointerNode(s.config.State.Node(parentNodeAddress))
existingNodePointer := &parentNode.Pointers[index]
existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress))
existingDataNode := s.config.State.Node(existingNodePointer.VolatileAddress)
newDataNode := s.config.State.Node(newNodeVolatileAddress)
keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode)
newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode)
Expand All @@ -468,7 +465,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict(
keyHashes[i] = 0
}

types.Store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress)
store(&parentNode.Pointers[newIndex].VolatileAddress, newNodeVolatileAddress)

tx.AddStoreRequest(&pipeline.StoreRequest{
Store: [pipeline.StoreCapacity]types.NodeRoot{
Expand Down Expand Up @@ -512,26 +509,26 @@ func (s *Space[K, V]) addPointerNode(

pointerNode.Pointers[0].SnapshotID = dataPointer.SnapshotID
pointerNode.Pointers[0].PersistentAddress = dataPointer.PersistentAddress
types.Store(&pointerNode.Pointers[0].VolatileAddress, dataPointer.VolatileAddress)
store(&pointerNode.Pointers[0].VolatileAddress, dataPointer.VolatileAddress)

pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(flagPointerNode)
if conflict {
pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(flagHashMod)
}
types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress)
store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress)

if conflict {
_, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1)
_, err = s.splitDataNodeWithConflict(tx, volatileAllocator, 0, pointerNodeVolatileAddress, v.level+1)
} else {
_, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeRoot.Pointer, v.level+1)
_, err = s.splitDataNodeWithoutConflict(tx, volatileAllocator, 0, pointerNodeVolatileAddress, v.level+1)
}
return err
}

func (s *Space[K, V]) walkPointers(v *Entry[K, V]) {
func (s *Space[K, V]) walkPointers(v *Entry[K, V]) types.VolatileAddress {
if v.nextDataNode != nil {
if isFree(types.Load(v.nextDataNode)) {
return
if isFree(load(v.nextDataNode)) {
return load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
}

v.storeRequest.PointersToStore--
Expand All @@ -542,19 +539,21 @@ func (s *Space[K, V]) walkPointers(v *Entry[K, V]) {
v.itemP = nil
}

lastVolatileAddress := load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
var stop bool
for {
if !s.walkOnePointer(v) {
return
if lastVolatileAddress, stop = s.walkOnePointer(v, lastVolatileAddress); stop {
return lastVolatileAddress
}
}
}

func (s *Space[K, V]) walkOnePointer(
v *Entry[K, V],
) bool {
volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)
volatileAddress types.VolatileAddress,
) (types.VolatileAddress, bool) {
if !isPointer(volatileAddress) {
return false
return volatileAddress, true
}
if volatileAddress.IsSet(flagHashMod) {
v.keyHash = s.hashKeyFunc(&v.item.Key, s.hashBuff, v.level)
Expand All @@ -573,26 +572,25 @@ func (s *Space[K, V]) walkOnePointer(

// If we are here it means that we should stop following potential next pointer nodes because we are in the
// non-final branch. A better data node might be added concurrently at any time.
return false
return pointerNode.Pointers[index].VolatileAddress, true
}

if v.stage == StagePointer0 && v.level == 3 {
return false
return pointerNode.Pointers[index].VolatileAddress, true
}

return true
return pointerNode.Pointers[index].VolatileAddress, false
}

func (s *Space[K, V]) walkDataItems(v *Entry[K, V]) bool {
func (s *Space[K, V]) walkDataItems(v *Entry[K, V], dataNodeAddress types.VolatileAddress) bool {
if v.keyHashP != nil {
return false
}

v.exists = false

var conflict bool
node := s.config.State.Node(types.Load(
&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress))
node := s.config.State.Node(dataNodeAddress)
keyHashes := s.config.DataNodeAssistant.KeyHashes(node)

zeroIndex, numOfMatches := compare.Compare(uint64(v.keyHash), (*uint64)(&keyHashes[0]), &s.hashMatches[0],
Expand Down Expand Up @@ -629,7 +627,7 @@ func (s *Space[K, V]) detectUpdate(v *Entry[K, V]) {
return
}

if isPointer(types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)) ||
if isPointer(v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) ||
*s.config.DeletionCounter != v.deletionCounter ||
(*v.keyHashP != 0 && (*v.keyHashP != v.keyHash || v.itemP.Key != v.item.Key)) {
v.keyHashP = nil
Expand Down Expand Up @@ -796,7 +794,7 @@ var pointerHops = [NumOfPointers][]uint64{
}

func reducePointerSlot(pointerNode *PointerNode, index uint64) (uint64, uint64) {
if !isFree(types.Load(&pointerNode.Pointers[index].VolatileAddress)) {
if !isFree(load(&pointerNode.Pointers[index].VolatileAddress)) {
return index, index
}

Expand All @@ -812,7 +810,7 @@ func reducePointerSlot(pointerNode *PointerNode, index uint64) (uint64, uint64)
hopIndex := (hopEnd-hopStart)/2 + hopStart
newIndex := hops[hopIndex]

volatileAddress := types.Load(&pointerNode.Pointers[newIndex].VolatileAddress)
volatileAddress := load(&pointerNode.Pointers[newIndex].VolatileAddress)
switch {
case isFree(volatileAddress):
if !dataFound {
Expand Down
4 changes: 2 additions & 2 deletions space/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], conflict bool) error {
var err error
if conflict {
_, err = s.s.splitDataNodeWithConflict(s.tx, s.allocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level)
} else {
_, err = s.s.splitDataNodeWithoutConflict(s.tx, s.allocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, v.level)
}
return err
}
Expand Down
11 changes: 0 additions & 11 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package types

import (
"math"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -53,16 +52,6 @@ func (na VolatileAddress) Set(flag VolatileAddress) VolatileAddress {
return na | flag
}

// Load loads node address atomically.
func Load(address *VolatileAddress) VolatileAddress {
return (VolatileAddress)(atomic.LoadUint64((*uint64)(address)))
}

// Store stores node address atomically.
func Store(address *VolatileAddress, value VolatileAddress) {
atomic.StoreUint64((*uint64)(address), (uint64)(value))
}

const (
numOfFlags = 2

Expand Down

0 comments on commit 4e4d4af

Please sign in to comment.