diff --git a/space/space.go b/space/space.go index 7e76a69..4eeb633 100644 --- a/space/space.go +++ b/space/space.go @@ -535,27 +535,12 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { - newIndex, mask := s.splitToIndex( - types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress), - v.parentIndex, - ) - - if newIndex != v.parentIndex { - if err := s.splitDataNode( - snapshotID, - tx, - walRecorder, - allocator, - v.parentIndex, - newIndex, - mask, - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, - v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer, - v.level, - ); err != nil { - return err - } - + splitDone, err := s.splitDataNode(snapshotID, tx, walRecorder, allocator, v.parentIndex, + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) + if err != nil { + return err + } + if splitDone { // This must be done because the item to set might go to the newly created data node. v.storeRequest.PointersToStore-- v.level-- @@ -598,31 +583,34 @@ func (s *Space[K, V]) splitDataNode( walRecorder *wal.Recorder, allocator *alloc.Allocator, index uint64, - newIndex uint64, - mask uint64, - parentNodePointer, existingNodePointer *types.Pointer, + parentNodePointer *types.Pointer, level uint8, -) error { - parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked())) +) (bool, error) { + newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index) + if newIndex == index { + return false, nil + } newNodeVolatileAddress, err := allocator.Allocate() if err != nil { - return err + return false, err } s.config.State.Clear(newNodeVolatileAddress) newNodePersistentAddress, err := allocator.Allocate() if err != nil { - return err + return false, err } s.config.State.Clear(newNodePersistentAddress) + parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked())) + existingNodePointer := &parentNode.Pointers[index] + existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) newDataNode := s.config.State.Node(newNodeVolatileAddress) - - node := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) - keyHashes := s.config.DataNodeAssistant.KeyHashes(node) + keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode) newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode) - for i, item := range s.config.DataNodeAssistant.Iterator(node) { + + for i, item := range s.config.DataNodeAssistant.Iterator(existingDataNode) { itemIndex := PointerIndex(keyHashes[i], level-1) if itemIndex&mask != newIndex { continue @@ -632,17 +620,17 @@ func (s *Space[K, V]) splitDataNode( if err := wal.Copy(walRecorder, tx, newNodePersistentAddress, existingNodePointer.PersistentAddress, newDataNodeItem, item, ); err != nil { - return err + return false, err } if err := wal.Set1(walRecorder, tx, newNodePersistentAddress, &newKeyHashes[i], &keyHashes[i], ); err != nil { - return err + return false, err } if err := wal.Set1(walRecorder, tx, existingNodePointer.PersistentAddress, &keyHashes[i], zeroKeyHashPtr, ); err != nil { - return err + return false, err } } @@ -650,13 +638,13 @@ func (s *Space[K, V]) splitDataNode( &parentNode.Pointers[newIndex].SnapshotID, &snapshotID, &parentNode.Pointers[newIndex].PersistentAddress, &newNodePersistentAddress, ); err != nil { - return err + return false, err } if err := wal.SetAddressAtomically(walRecorder, tx, parentNodePointer.PersistentAddress, &parentNode.Pointers[newIndex].VolatileAddress, &newNodeVolatileAddress, ); err != nil { - return err + return false, err } tx.AddStoreRequest(&pipeline.StoreRequest{ @@ -680,7 +668,7 @@ func (s *Space[K, V]) splitDataNode( NoSnapshots: s.config.NoSnapshots, }) - return nil + return true, nil } func (s *Space[K, V]) addPointerNode( @@ -744,20 +732,8 @@ func (s *Space[K, V]) addPointerNode( types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) } - newIndex, mask := s.splitToIndex(pointerNodeVolatileAddress, 0) - - return s.splitDataNode( - snapshotID, - tx, - walRecorder, - allocator, - 0, - newIndex, - mask, - pointerNodeRoot.Pointer, - &pointerNode.Pointers[0], - v.level+1, - ) + _, err = s.splitDataNode(snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, v.level+1) + return err } var pointerHops = [NumOfPointers][]uint64{