diff --git a/space/space.go b/space/space.go index 20734a5..0f42fb1 100644 --- a/space/space.go +++ b/space/space.go @@ -103,7 +103,7 @@ func (s *Space[K, V]) Find( v.dataItemIndex = dataItemIndex(v.keyHash, s.numOfDataItems) v.stage = stage - if s.config.SpaceRoot.Pointer.VolatileAddress != types.FreeAddress && + if types.Load(&s.config.SpaceRoot.Pointer.VolatileAddress) != types.FreeAddress && s.config.SpaceRoot.Pointer.SnapshotID != snapshotID { persistentAddress, err := allocator.Allocate() if err != nil { @@ -126,20 +126,20 @@ func (s *Space[K, V]) Find( // Query queries the key. func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bool) { - pointer := s.config.SpaceRoot.Pointer + volatileAddress := types.Load(&s.config.SpaceRoot.Pointer.VolatileAddress) var level uint8 keyHash := hashKey(&key, nil, level) - for pointer.VolatileAddress.IsSet(types.FlagPointerNode) { - if pointer.VolatileAddress.IsSet(types.FlagHashMod) { + for volatileAddress.IsSet(types.FlagPointerNode) { + if volatileAddress.IsSet(types.FlagHashMod) { keyHash = hashKey(&key, hashBuff, level) } - pointerNode := ProjectPointerNode(s.config.State.Node(pointer.VolatileAddress.Naked())) + pointerNode := ProjectPointerNode(s.config.State.Node(volatileAddress.Naked())) index := PointerIndex(keyHash, level) - state := pointerNode.Pointers[index].VolatileAddress.State() + candidateAddress := types.Load(&pointerNode.Pointers[index].VolatileAddress) - switch state { + switch candidateAddress.State() { case types.StateFree: hops := pointerHops[index] hopStart := 0 @@ -150,7 +150,8 @@ func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bo hopIndex := (hopEnd-hopStart)/2 + hopStart newIndex := hops[hopIndex] - switch pointerNode.Pointers[newIndex].VolatileAddress.State() { + nextCandidateAddress := types.Load(&pointerNode.Pointers[newIndex].VolatileAddress) + switch nextCandidateAddress.State() { case types.StateFree: if !dataFound { index = newIndex @@ -168,14 +169,14 @@ func (s *Space[K, V]) Query(key K, hashBuff []byte, hashMatches []uint64) (V, bo } level++ - pointer = &pointerNode.Pointers[index] + volatileAddress = types.Load(&pointerNode.Pointers[index].VolatileAddress) } - if pointer.VolatileAddress == types.FreeAddress { + if volatileAddress == types.FreeAddress { return s.defaultValue, false } - node := s.config.State.Node(pointer.VolatileAddress) + node := s.config.State.Node(volatileAddress) keyHashes := s.config.DataNodeAssistant.KeyHashes(node) _, numOfMatches := compare.Compare(uint64(keyHash), (*uint64)(&keyHashes[0]), &hashMatches[0], @@ -199,19 +200,20 @@ func (s *Space[K, V]) Iterator() func(func(item *types.DataItem[K, V]) bool) { } func (s *Space[K, V]) iterate(pointer *types.Pointer, yield func(item *types.DataItem[K, V]) bool) { - switch pointer.VolatileAddress.State() { + volatileAddress := types.Load(&pointer.VolatileAddress) + switch volatileAddress.State() { case types.StatePointer: - pointerNode := ProjectPointerNode(s.config.State.Node(pointer.VolatileAddress.Naked())) + pointerNode := ProjectPointerNode(s.config.State.Node(volatileAddress.Naked())) for pi := range pointerNode.Pointers { p := &pointerNode.Pointers[pi] - if p.VolatileAddress.State() == types.StateFree { + if types.Load(&p.VolatileAddress).State() == types.StateFree { continue } s.iterate(p, yield) } case types.StateData: - for _, item := range s.config.DataNodeAssistant.Iterator(s.config.State.Node(pointer.VolatileAddress)) { + for _, item := range s.config.DataNodeAssistant.Iterator(s.config.State.Node(volatileAddress)) { if !yield(item) { return } @@ -221,15 +223,16 @@ func (s *Space[K, V]) iterate(pointer *types.Pointer, yield func(item *types.Dat // Nodes returns list of nodes used by the space. func (s *Space[K, V]) Nodes() []types.NodeAddress { - switch s.config.SpaceRoot.Pointer.VolatileAddress.State() { + volatileAddress := types.Load(&s.config.SpaceRoot.Pointer.VolatileAddress) + switch volatileAddress.State() { case types.StateFree: return nil case types.StateData: - return []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress} + return []types.NodeAddress{volatileAddress} } nodes := []types.NodeAddress{} - stack := []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress} + stack := []types.NodeAddress{volatileAddress.Naked()} for { if len(stack) == 0 { @@ -246,12 +249,13 @@ func (s *Space[K, V]) Nodes() []types.NodeAddress { pointerNode := ProjectPointerNode(s.config.State.Node(pointerNodeAddress.Naked())) for pi := range pointerNode.Pointers { - switch pointerNode.Pointers[pi].VolatileAddress.State() { + volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) + switch volatileAddress.State() { case types.StateFree: case types.StateData: - nodes = append(nodes, pointerNode.Pointers[pi].VolatileAddress) + nodes = append(nodes, volatileAddress) case types.StatePointer: - stack = append(stack, pointerNode.Pointers[pi].VolatileAddress) + stack = append(stack, volatileAddress.Naked()) } } } @@ -259,17 +263,18 @@ func (s *Space[K, V]) Nodes() []types.NodeAddress { // Stats returns stats about the space. func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) { - switch s.config.SpaceRoot.Pointer.VolatileAddress.State() { + volatileAddress := types.Load(&s.config.SpaceRoot.Pointer.VolatileAddress) + switch volatileAddress.State() { case types.StateFree: return 0, 0, 0, 0 case types.StateData: return 1, 0, 1, 0 } - stack := []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress} + stack := []types.NodeAddress{volatileAddress.Naked()} levels := map[types.NodeAddress]uint64{ - s.config.SpaceRoot.Pointer.VolatileAddress: 1, + volatileAddress.Naked(): 1, } var maxLevel, pointerNodes, dataNodes, dataItems uint64 @@ -285,7 +290,8 @@ func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) { pointerNode := ProjectPointerNode(s.config.State.Node(n.Naked())) for pi := range pointerNode.Pointers { - switch pointerNode.Pointers[pi].VolatileAddress.State() { + volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) + switch volatileAddress.State() { case types.StateFree: case types.StateData: dataNodes++ @@ -294,13 +300,13 @@ func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) { } //nolint:gofmt,revive // looks like a bug in linter for _, _ = range s.config.DataNodeAssistant.Iterator(s.config.State.Node( - pointerNode.Pointers[pi].VolatileAddress, + volatileAddress, )) { dataItems++ } case types.StatePointer: - stack = append(stack, pointerNode.Pointers[pi].VolatileAddress) - levels[pointerNode.Pointers[pi].VolatileAddress] = level + stack = append(stack, volatileAddress.Naked()) + levels[volatileAddress.Naked()] = level } } } @@ -362,7 +368,8 @@ func (s *Space[K, V]) deleteValue( } switch { - case v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress == types.FreeAddress: + case types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer. + VolatileAddress) == types.FreeAddress: case v.keyHashP == nil || *v.keyHashP == 0: default: // If we are here it means `s.find` found the slot with matching key so don't need to check hash and key again. @@ -419,7 +426,8 @@ func (s *Space[K, V]) find( return nil } - if v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress.State() != types.StateData { + if types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress). + State() != types.StateData { v.keyHashP = nil v.itemP = nil v.exists = false @@ -445,8 +453,10 @@ func (s *Space[K, V]) set( return err } + volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + //nolint:nestif - if v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress == types.FreeAddress { + if volatileAddress == types.FreeAddress { dataNodeVolatileAddress, err := allocator.Allocate() if err != nil { return err @@ -460,18 +470,24 @@ func (s *Space[K, V]) set( s.config.State.Clear(dataNodePersistentAddress) if v.storeRequest.PointersToStore > 1 { - if err := wal.Set3(walRecorder, tx, + if err := wal.Set2(walRecorder, tx, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.PersistentAddress, &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID, &snapshotID, - &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, &dataNodeVolatileAddress, &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.PersistentAddress, &dataNodePersistentAddress, ); err != nil { return err } + + if err := wal.SetAddressAtomically(walRecorder, tx, + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.PersistentAddress, + &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, &dataNodeVolatileAddress, + ); err != nil { + return err + } } else { v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID = snapshotID - v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress = dataNodeVolatileAddress v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.PersistentAddress = dataNodePersistentAddress + types.Store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, dataNodeVolatileAddress) } } @@ -503,7 +519,7 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { newIndex, mask := s.splitToIndex( - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress, + types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.VolatileAddress), v.parentIndex, ) @@ -550,7 +566,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.NodeAddress, index ui for mask > 1 { mask >>= 1 newIndex := index | mask - if parentNode.Pointers[newIndex].VolatileAddress == types.FreeAddress { + if types.Load(&parentNode.Pointers[newIndex].VolatileAddress) == types.FreeAddress { index = newIndex break } @@ -570,7 +586,7 @@ func (s *Space[K, V]) splitDataNode( parentNodePointer, existingNodePointer *types.Pointer, level uint8, ) error { - parentNode := ProjectPointerNode(s.config.State.Node(parentNodePointer.VolatileAddress.Naked())) + parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked())) newNodeVolatileAddress, err := allocator.Allocate() if err != nil { @@ -586,7 +602,7 @@ func (s *Space[K, V]) splitDataNode( newDataNode := s.config.State.Node(newNodeVolatileAddress) - node := s.config.State.Node(existingNodePointer.VolatileAddress) + node := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) keyHashes := s.config.DataNodeAssistant.KeyHashes(node) newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode) for i, item := range s.config.DataNodeAssistant.Iterator(node) { @@ -613,14 +629,19 @@ func (s *Space[K, V]) splitDataNode( } } - if err := wal.Set3(walRecorder, tx, parentNodePointer.PersistentAddress, + if err := wal.Set2(walRecorder, tx, parentNodePointer.PersistentAddress, &parentNode.Pointers[newIndex].SnapshotID, &snapshotID, - &parentNode.Pointers[newIndex].VolatileAddress, &newNodeVolatileAddress, &parentNode.Pointers[newIndex].PersistentAddress, &newNodePersistentAddress, ); err != nil { return err } + if err := wal.SetAddressAtomically(walRecorder, tx, parentNodePointer.PersistentAddress, + &parentNode.Pointers[newIndex].VolatileAddress, &newNodeVolatileAddress, + ); err != nil { + return err + } + tx.AddStoreRequest(&pipeline.StoreRequest{ Store: [pipeline.StoreCapacity]types.NodeRoot{ { @@ -669,30 +690,41 @@ func (s *Space[K, V]) addPointerNode( pointerNode := ProjectPointerNode(s.config.State.Node(pointerNodeVolatileAddress)) pointerNodeRoot := &v.storeRequest.Store[v.storeRequest.PointersToStore-1] - if err := wal.Set3(walRecorder, tx, pointerNodePersistentAddress, + if err := wal.Set2(walRecorder, tx, pointerNodePersistentAddress, &pointerNode.Pointers[0].SnapshotID, &dataPointer.SnapshotID, - &pointerNode.Pointers[0].VolatileAddress, &dataPointer.VolatileAddress, &pointerNode.Pointers[0].PersistentAddress, &dataPointer.PersistentAddress, ); err != nil { return err } + if err := wal.SetAddressAtomically(walRecorder, tx, pointerNodePersistentAddress, + &pointerNode.Pointers[0].VolatileAddress, &dataPointer.VolatileAddress, + ); err != nil { + return err + } + pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(types.FlagPointerNode) if conflict { pointerNodeVolatileAddress = pointerNodeVolatileAddress.Set(types.FlagHashMod) } if v.storeRequest.PointersToStore > 1 { - if err := wal.Set2(walRecorder, tx, + if err := wal.Set1(walRecorder, tx, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.PersistentAddress, - &pointerNodeRoot.Pointer.VolatileAddress, &pointerNodeVolatileAddress, &pointerNodeRoot.Pointer.PersistentAddress, &pointerNodePersistentAddress, ); err != nil { return err } + + if err := wal.SetAddressAtomically(walRecorder, tx, + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.PersistentAddress, + &pointerNodeRoot.Pointer.VolatileAddress, &pointerNodeVolatileAddress, + ); err != nil { + return err + } } else { - pointerNodeRoot.Pointer.VolatileAddress = pointerNodeVolatileAddress pointerNodeRoot.Pointer.PersistentAddress = pointerNodePersistentAddress + types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) } newIndex, mask := s.splitToIndex(pointerNodeVolatileAddress, 0) @@ -786,16 +818,18 @@ func (s *Space[K, V]) walkPointers( v *Entry[K, V], hashBuff []byte, ) error { - for v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress.IsSet(types.FlagPointerNode) { - if v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress.IsSet(types.FlagHashMod) { + for { + volatileAddress := types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + if !volatileAddress.IsSet(types.FlagPointerNode) { + break + } + if volatileAddress.IsSet(types.FlagHashMod) { v.keyHash = hashKey(&v.item.Key, hashBuff, v.level) } - pointerNode := ProjectPointerNode(s.config.State.Node( - v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress.Naked(), - )) + pointerNode := ProjectPointerNode(s.config.State.Node(volatileAddress.Naked())) index := PointerIndex(v.keyHash, v.level) - state := pointerNode.Pointers[index].VolatileAddress.State() + state := types.Load(&pointerNode.Pointers[index].VolatileAddress).State() nextIndex := index originalIndex := index @@ -810,7 +844,7 @@ func (s *Space[K, V]) walkPointers( hopIndex := (hopEnd-hopStart)/2 + hopStart newIndex := hops[hopIndex] - switch pointerNode.Pointers[newIndex].VolatileAddress.State() { + switch types.Load(&pointerNode.Pointers[newIndex].VolatileAddress).State() { case types.StateFree: if !dataFound { index = newIndex @@ -840,7 +874,7 @@ func (s *Space[K, V]) walkPointers( } //nolint:nestif - if pointerNode.Pointers[index].VolatileAddress != types.FreeAddress && + if types.Load(&pointerNode.Pointers[index].VolatileAddress) != types.FreeAddress && pointerNode.Pointers[index].SnapshotID != snapshotID { persistentAddress, err := allocator.Allocate() if err != nil { @@ -893,7 +927,9 @@ func (s *Space[K, V]) walkDataItems(v *Entry[K, V], hashMatches []uint64) bool { v.exists = false var conflict bool - node := s.config.State.Node(v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + node := s.config.State.Node(types.Load( + &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress), + ) keyHashes := s.config.DataNodeAssistant.KeyHashes(node) zeroIndex, numOfMatches := compare.Compare(uint64(v.keyHash), (*uint64)(&keyHashes[0]), &hashMatches[0], @@ -1030,8 +1066,8 @@ func detectUpdate[K, V comparable](v *Entry[K, V]) { v.keyHashP = nil v.itemP = nil - case v.keyHashP != nil && (v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer. - VolatileAddress.State() != types.StateData || + case v.keyHashP != nil && (types.Load(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer. + VolatileAddress).State() != types.StateData || (*v.keyHashP != 0 && (*v.keyHashP != v.keyHash || v.itemP.Key != v.item.Key))): v.keyHashP = nil v.itemP = nil @@ -1044,11 +1080,12 @@ func Deallocate( deallocator *alloc.Deallocator, state *alloc.State, ) { - switch spaceRoot.VolatileAddress.State() { + volatileAddress := types.Load(&spaceRoot.VolatileAddress) + switch volatileAddress.State() { case types.StateFree: return case types.StateData: - deallocator.Deallocate(spaceRoot.VolatileAddress) + deallocator.Deallocate(volatileAddress) deallocator.Deallocate(spaceRoot.PersistentAddress) return } @@ -1061,18 +1098,20 @@ func deallocatePointerNode( deallocator *alloc.Deallocator, state *alloc.State, ) { - pointerNode := ProjectPointerNode(state.Node(pointer.VolatileAddress.Naked())) + volatileAddress := types.Load(&pointer.VolatileAddress) + pointerNode := ProjectPointerNode(state.Node(volatileAddress.Naked())) for pi := range pointerNode.Pointers { p := &pointerNode.Pointers[pi] - switch p.VolatileAddress.State() { + volatileAddress := types.Load(&p.VolatileAddress) + switch volatileAddress.State() { case types.StateData: - deallocator.Deallocate(pointer.VolatileAddress) + deallocator.Deallocate(volatileAddress) deallocator.Deallocate(p.PersistentAddress) case types.StatePointer: deallocatePointerNode(p, deallocator, state) } } - deallocator.Deallocate(pointer.VolatileAddress) + deallocator.Deallocate(volatileAddress) deallocator.Deallocate(pointer.PersistentAddress) } diff --git a/types/types.go b/types/types.go index b494aa5..f16861d 100644 --- a/types/types.go +++ b/types/types.go @@ -1,5 +1,7 @@ package types +import "sync/atomic" + const ( // UInt64Length is the number of bytes taken by uint64. UInt64Length = 8 @@ -58,6 +60,16 @@ func (na NodeAddress) Naked() NodeAddress { return na & flagNaked } +// Load loads node address atomically. +func Load(address *NodeAddress) NodeAddress { + return (NodeAddress)(atomic.LoadUint64((*uint64)(address))) +} + +// Store stores node address atomically. +func Store(address *NodeAddress, value NodeAddress) { + atomic.StoreUint64((*uint64)(address), (uint64)(value)) +} + const ( // FreeAddress means address is not assigned. FreeAddress NodeAddress = 0 diff --git a/wal/record.go b/wal/record.go index 09b2efa..d6c487e 100644 --- a/wal/record.go +++ b/wal/record.go @@ -268,42 +268,18 @@ func Set3[T1, T2, T3 comparable]( return nil } -// Set4 copies four objects and stores changes in WAL node. -func Set4[T1, T2, T3, T4 comparable]( +// SetAddressAtomically copies address atomically and stores changes in WAL node. +func SetAddressAtomically( recorder *Recorder, tx *pipeline.TransactionRequest, dstNodeAddress qtypes.NodeAddress, - dst1, src1 *T1, - dst2, src2 *T2, - dst3, src3 *T3, - dst4, src4 *T4, + dst, src *qtypes.NodeAddress, ) error { - dst1B, err := Reserve(recorder, tx, dstNodeAddress, dst1) - if err != nil { - return err - } - dst2B, err := Reserve(recorder, tx, dstNodeAddress, dst2) - if err != nil { - return err - } - dst3B, err := Reserve(recorder, tx, dstNodeAddress, dst3) - if err != nil { - return err - } - dst4B, err := Reserve(recorder, tx, dstNodeAddress, dst4) + dst2, err := Reserve(recorder, tx, dstNodeAddress, dst) if err != nil { return err } - *dst1 = *src1 - *dst1B = *src1 - - *dst2 = *src2 - *dst2B = *src2 - - *dst3 = *src3 - *dst3B = *src3 - - *dst4 = *src4 - *dst4B = *src4 + *dst2 = *src + qtypes.Store(dst, *src) return nil }