Skip to content

Commit

Permalink
Simplify splitDataNode method (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Dec 3, 2024
1 parent d80c6f9 commit 014da76
Showing 1 changed file with 28 additions and 52 deletions.
80 changes: 28 additions & 52 deletions space/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,27 +535,12 @@ func (s *Space[K, V]) set(

// Try to split data node.
if v.storeRequest.PointersToStore > 1 {
newIndex, mask := s.splitToIndex(
types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress),
v.parentIndex,
)

if newIndex != v.parentIndex {
if err := s.splitDataNode(
snapshotID,
tx,
walRecorder,
allocator,
v.parentIndex,
newIndex,
mask,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer,
v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer,
v.level,
); err != nil {
return err
}

splitDone, err := s.splitDataNode(snapshotID, tx, walRecorder, allocator, v.parentIndex,
v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level)
if err != nil {
return err
}
if splitDone {
// This must be done because the item to set might go to the newly created data node.
v.storeRequest.PointersToStore--
v.level--
Expand Down Expand Up @@ -598,31 +583,34 @@ func (s *Space[K, V]) splitDataNode(
walRecorder *wal.Recorder,
allocator *alloc.Allocator,
index uint64,
newIndex uint64,
mask uint64,
parentNodePointer, existingNodePointer *types.Pointer,
parentNodePointer *types.Pointer,
level uint8,
) error {
parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked()))
) (bool, error) {
newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index)
if newIndex == index {
return false, nil
}

newNodeVolatileAddress, err := allocator.Allocate()
if err != nil {
return err
return false, err
}
s.config.State.Clear(newNodeVolatileAddress)

newNodePersistentAddress, err := allocator.Allocate()
if err != nil {
return err
return false, err
}
s.config.State.Clear(newNodePersistentAddress)

parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked()))
existingNodePointer := &parentNode.Pointers[index]
existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress))
newDataNode := s.config.State.Node(newNodeVolatileAddress)

node := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress))
keyHashes := s.config.DataNodeAssistant.KeyHashes(node)
keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode)
newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode)
for i, item := range s.config.DataNodeAssistant.Iterator(node) {

for i, item := range s.config.DataNodeAssistant.Iterator(existingDataNode) {
itemIndex := PointerIndex(keyHashes[i], level-1)
if itemIndex&mask != newIndex {
continue
Expand All @@ -632,31 +620,31 @@ func (s *Space[K, V]) splitDataNode(
if err := wal.Copy(walRecorder, tx, newNodePersistentAddress, existingNodePointer.PersistentAddress,
newDataNodeItem, item,
); err != nil {
return err
return false, err
}
if err := wal.Set1(walRecorder, tx, newNodePersistentAddress,
&newKeyHashes[i], &keyHashes[i],
); err != nil {
return err
return false, err
}
if err := wal.Set1(walRecorder, tx, existingNodePointer.PersistentAddress,
&keyHashes[i], zeroKeyHashPtr,
); err != nil {
return err
return false, err
}
}

if err := wal.Set2(walRecorder, tx, parentNodePointer.PersistentAddress,
&parentNode.Pointers[newIndex].SnapshotID, &snapshotID,
&parentNode.Pointers[newIndex].PersistentAddress, &newNodePersistentAddress,
); err != nil {
return err
return false, err
}

if err := wal.SetAddressAtomically(walRecorder, tx, parentNodePointer.PersistentAddress,
&parentNode.Pointers[newIndex].VolatileAddress, &newNodeVolatileAddress,
); err != nil {
return err
return false, err
}

tx.AddStoreRequest(&pipeline.StoreRequest{
Expand All @@ -680,7 +668,7 @@ func (s *Space[K, V]) splitDataNode(
NoSnapshots: s.config.NoSnapshots,
})

return nil
return true, nil
}

func (s *Space[K, V]) addPointerNode(
Expand Down Expand Up @@ -744,20 +732,8 @@ func (s *Space[K, V]) addPointerNode(
types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress)
}

newIndex, mask := s.splitToIndex(pointerNodeVolatileAddress, 0)

return s.splitDataNode(
snapshotID,
tx,
walRecorder,
allocator,
0,
newIndex,
mask,
pointerNodeRoot.Pointer,
&pointerNode.Pointers[0],
v.level+1,
)
_, err = s.splitDataNode(snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, v.level+1)
return err
}

var pointerHops = [NumOfPointers][]uint64{
Expand Down

0 comments on commit 014da76

Please sign in to comment.