diff --git a/alloc/state.go b/alloc/state.go index 8d8bf6c..e086930 100644 --- a/alloc/state.go +++ b/alloc/state.go @@ -121,6 +121,7 @@ func (s *State) Run(ctx context.Context) error { }) spawn("pump", parallel.Continue, func(ctx context.Context) error { defer close(s.allocationPoolCh) + return s.runPump( ctx, s.allocationCh, diff --git a/alloc/test.go b/alloc/test.go new file mode 100644 index 0000000..72737fa --- /dev/null +++ b/alloc/test.go @@ -0,0 +1,40 @@ +package alloc + +import ( + "context" + "testing" + + "github.com/pkg/errors" + + "github.com/outofforest/logger" + "github.com/outofforest/parallel" +) + +// RunInTest creates and runs state for unit tests. +func RunInTest(t *testing.T, size, nodesPerGroup uint64) (*State, error) { + state, stateDeallocFunc, err := NewState( + size, + nodesPerGroup, + false, + ) + if err != nil { + return nil, err + } + t.Cleanup(stateDeallocFunc) + + ctx, cancel := context.WithCancel(logger.WithLogger(context.Background(), logger.New(logger.DefaultConfig))) + t.Cleanup(cancel) + + group := parallel.NewGroup(ctx) + group.Spawn("state", parallel.Continue, state.Run) + + t.Cleanup(func() { + state.Close() + group.Exit(nil) + if err := group.Wait(); err != nil && !errors.Is(err, context.Canceled) { + t.Fatal(err) + } + }) + + return state, nil +} diff --git a/benchmark_test.go b/benchmark_test.go index 98cc753..dae7167 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -17,7 +17,6 @@ import ( "github.com/outofforest/quantum" "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/persistent" - "github.com/outofforest/quantum/space" "github.com/outofforest/quantum/tx/genesis" "github.com/outofforest/quantum/tx/transfer" txtypes "github.com/outofforest/quantum/tx/types" @@ -111,8 +110,6 @@ func BenchmarkBalanceTransfer(b *testing.B) { panic(err) } - st := space.NewSpaceTest(s, nil, nil, nil, nil) - hashBuff := s.NewHashBuff() hashMatches := s.NewHashMatches() @@ -129,7 +126,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { panic(err) } - fmt.Println(st.Stats()) + fmt.Println(s.Stats()) fmt.Println("===========================") genesisBalance, genesisExists := s.Query(txtypes.GenesisAccount, hashBuff, hashMatches) @@ -170,7 +167,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { }() func() { - fmt.Println(st.Stats()) + fmt.Println(s.Stats()) genesisBalance, genesisExists := s.Query(txtypes.GenesisAccount, hashBuff, hashMatches) require.True(b, genesisExists) diff --git a/db.go b/db.go index c815465..76ec9aa 100644 --- a/db.go +++ b/db.go @@ -297,22 +297,19 @@ func (db *DB) deleteSnapshot( if err != nil { return err } - exists, err := snapshotInfoValue.Exists(snapshotID, tx, walRecorder, allocator, snapshotHashBuff, - snapshotHashMatches) + exists, err := snapshotInfoValue.Exists(tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } if !exists { return errors.Errorf("snapshot %d to delete does not exist", snapshotID) } - snapshotInfo, err := snapshotInfoValue.Value(snapshotID, tx, walRecorder, allocator, snapshotHashBuff, - snapshotHashMatches) + snapshotInfo, err := snapshotInfoValue.Value(tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } - if err := snapshotInfoValue.Delete(snapshotID, tx, walRecorder, allocator, snapshotHashBuff, - snapshotHashMatches); err != nil { + if err := snapshotInfoValue.Delete(tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches); err != nil { return err } @@ -322,16 +319,14 @@ func (db *DB) deleteSnapshot( return err } - exists, err = nextSnapshotInfoValue.Exists(snapshotID, tx, walRecorder, allocator, snapshotHashBuff, - snapshotHashMatches) + exists, err = nextSnapshotInfoValue.Exists(tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } if !exists { return errors.Errorf("next snapshot %d does not exist", snapshotID) } - nextSnapshotInfo, err := nextSnapshotInfoValue.Value(snapshotID, tx, walRecorder, allocator, snapshotHashBuff, - snapshotHashMatches) + nextSnapshotInfo, err := nextSnapshotInfoValue.Value(tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -375,7 +370,7 @@ func (db *DB) deleteSnapshot( if err != nil { return err } - listRootAddress, err := deallocationListValue.Value(snapshotID, tx, walRecorder, allocator, deallocationHashBuff, + listRootAddress, err := deallocationListValue.Value(tx, walRecorder, allocator, deallocationHashBuff, deallocationHashMatches) if err != nil { return err @@ -387,7 +382,6 @@ func (db *DB) deleteSnapshot( } if listRootAddress != originalListRootAddress { if err := deallocationListValue.Set( - snapshotID, tx, walRecorder, allocator, @@ -403,7 +397,6 @@ func (db *DB) deleteSnapshot( nextSnapshotInfo.DeallocationRoot = snapshotInfo.DeallocationRoot nextSnapshotInfo.PreviousSnapshotID = snapshotInfo.PreviousSnapshotID if err := nextSnapshotInfoValue.Set( - snapshotID, tx, walRecorder, allocator, @@ -428,8 +421,8 @@ func (db *DB) deleteSnapshot( if err != nil { return err } - exists, err := previousSnapshotInfoValue.Exists(snapshotID, - tx, walRecorder, allocator, snapshotHashBuff, snapshotHashMatches) + exists, err := previousSnapshotInfoValue.Exists(tx, walRecorder, allocator, snapshotHashBuff, + snapshotHashMatches) if err != nil { return err } @@ -437,15 +430,14 @@ func (db *DB) deleteSnapshot( return errors.Errorf("previous snapshot %d does not exist", snapshotID) } - previousSnapshotInfo, err := previousSnapshotInfoValue.Value(snapshotID, tx, walRecorder, allocator, - snapshotHashBuff, snapshotHashMatches) + previousSnapshotInfo, err := previousSnapshotInfoValue.Value(tx, walRecorder, allocator, snapshotHashBuff, + snapshotHashMatches) if err != nil { return err } previousSnapshotInfo.NextSnapshotID = snapshotInfo.NextSnapshotID if err := previousSnapshotInfoValue.Set( - snapshotID, tx, walRecorder, allocator, @@ -486,14 +478,14 @@ func (db *DB) commit( if err != nil { return err } - exists, err := deallocationListValue.Exists(commitSnapshotID, tx, walRecorder, allocator, - deallocationHashBuff, deallocationHashMatches) + exists, err := deallocationListValue.Exists(tx, walRecorder, allocator, deallocationHashBuff, + deallocationHashMatches) if err != nil { return err } if exists { - v, err := deallocationListValue.Value(commitSnapshotID, tx, walRecorder, allocator, - deallocationHashBuff, deallocationHashMatches) + v, err := deallocationListValue.Value(tx, walRecorder, allocator, deallocationHashBuff, + deallocationHashMatches) if err != nil { return err } @@ -503,7 +495,6 @@ func (db *DB) commit( } } if err := deallocationListValue.Set( - commitSnapshotID, tx, walRecorder, allocator, @@ -525,7 +516,6 @@ func (db *DB) commit( return err } if err := nextSnapshotInfoValue.Set( - commitSnapshotID, tx, walRecorder, allocator, @@ -610,8 +600,7 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read if req.Transaction != nil { switch tx := req.Transaction.(type) { case *transfer.Tx: - if err := tx.Execute(db.singularityNode.LastSnapshotID, req, walRecorder, allocator, hashBuff, - hashMatches); err != nil { + if err := tx.Execute(req, walRecorder, allocator, hashBuff, hashMatches); err != nil { return err } case *commitTx: diff --git a/space/no_concurrency_test.go b/space/no_concurrency_test.go new file mode 100644 index 0000000..2bf166a --- /dev/null +++ b/space/no_concurrency_test.go @@ -0,0 +1,197 @@ +// Github actions run on machines not supporting AVX-512 instructions. +//go:build nogithub + +package space + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/outofforest/quantum/alloc" + txtypes "github.com/outofforest/quantum/tx/types" + "github.com/outofforest/quantum/types" +) + +const ( + nodesPerGroup = 100 + stateSize = 100 * nodesPerGroup * types.NodeLength +) + +// TestCRUDOnRootDataNode tests basic CRUD operations using one data item on single data node being the root +// of the space. +func TestCRUDOnRootDataNode(t *testing.T) { + requireT := require.New(t) + + var ( + snapshotID types.SnapshotID = 1 + + account = TestKey[txtypes.Account]{ + Key: txtypes.Account{0x01}, + KeyHash: 1, + } + amount txtypes.Amount = 100 + ) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) + + // Read non-existing + + balance, exists := s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) + + v, err := s.NewEntry(snapshotID, account, StageData) + requireT.NoError(err) + + exists, err = s.KeyExists(v) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + // Create + + requireT.NoError(s.SetKey(v, amount)) + + // Read existing + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount, balance) + + exists, err = s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + v2, err := s.NewEntry(snapshotID, account, StageData) + requireT.NoError(err) + + exists, err = s.KeyExists(v2) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(amount, balance) + + v3, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v3) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v3) + requireT.NoError(err) + requireT.Equal(amount, balance) + + // Update 1 + + requireT.NoError(s.SetKey(v3, amount+1)) + + exists, err = s.KeyExists(v3) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v3) + requireT.NoError(err) + requireT.Equal(amount+1, balance) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount+1, balance) + + v4, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v4) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v4) + requireT.NoError(err) + requireT.Equal(amount+1, balance) + + // Update 2 + + requireT.NoError(s.SetKey(v4, amount+2)) + + exists, err = s.KeyExists(v4) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v4) + requireT.NoError(err) + requireT.Equal(amount+2, balance) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount+2, balance) + + v5, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v5) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v5) + requireT.NoError(err) + requireT.Equal(amount+2, balance) + + // Delete 1 + + requireT.NoError(s.DeleteKey(v5)) + + exists, err = s.KeyExists(v5) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v5) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + balance, exists = s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) + + // Recreate + + v, err = s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + requireT.NoError(s.SetKey(v, amount)) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount, balance) + + // Delete 2 + + v2, err = s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + requireT.NoError(s.DeleteKey(v2)) + + exists, err = s.KeyExists(v2) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + balance, exists = s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) +} diff --git a/space/space.go b/space/space.go index da5a2d0..6c680f0 100644 --- a/space/space.go +++ b/space/space.go @@ -3,6 +3,7 @@ package space import ( "math" "math/bits" + "sort" "unsafe" "github.com/cespare/xxhash" @@ -100,7 +101,7 @@ func (s *Space[K, V]) Find( } } - return s.find(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey) + return s.find(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey) } // Query queries the key. @@ -116,6 +117,92 @@ func (s *Space[K, V]) Iterator() func(func(item *types.DataItem[K, V]) bool) { } } +// Nodes returns the list of nodes allocated by the tree. +func (s *Space[K, V]) Nodes() []types.NodeAddress { + switch s.config.SpaceRoot.Pointer.VolatileAddress.State() { + case types.StateFree: + return nil + case types.StateData: + return []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress} + } + + nodes := []types.NodeAddress{} + stack := []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress.Naked()} + + for { + if len(stack) == 0 { + sort.Slice(nodes, func(i, j int) bool { + return nodes[i] < nodes[j] + }) + + return nodes + } + + pointerNodeAddress := stack[len(stack)-1] + stack = stack[:len(stack)-1] + nodes = append(nodes, pointerNodeAddress) + + pointerNode := ProjectPointerNode(s.config.State.Node(pointerNodeAddress.Naked())) + for pi := range pointerNode.Pointers { + switch pointerNode.Pointers[pi].VolatileAddress.State() { + case types.StateFree: + case types.StateData: + nodes = append(nodes, pointerNode.Pointers[pi].VolatileAddress) + case types.StatePointer: + stack = append(stack, pointerNode.Pointers[pi].VolatileAddress.Naked()) + } + } + } +} + +// Stats returns space-related statistics. +func (s *Space[K, V]) Stats() (uint64, uint64, uint64, float64) { + switch s.config.SpaceRoot.Pointer.VolatileAddress.State() { + case types.StateFree: + return 0, 0, 0, 0 + case types.StateData: + return 1, 0, 1, 0 + } + + stack := []types.NodeAddress{s.config.SpaceRoot.Pointer.VolatileAddress.Naked()} + + levels := map[types.NodeAddress]uint64{ + s.config.SpaceRoot.Pointer.VolatileAddress.Naked(): 1, + } + var maxLevel, pointerNodes, dataNodes, dataItems uint64 + + for { + if len(stack) == 0 { + return maxLevel, pointerNodes, dataNodes, float64(dataItems) / float64(dataNodes*s.numOfDataItems) + } + + n := stack[len(stack)-1] + level := levels[n] + 1 + pointerNodes++ + stack = stack[:len(stack)-1] + + pointerNode := ProjectPointerNode(s.config.State.Node(n.Naked())) + for pi := range pointerNode.Pointers { + volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) + switch volatileAddress.State() { + case types.StateFree: + case types.StateData: + dataNodes++ + if level > maxLevel { + maxLevel = level + } + //nolint:gofmt,revive // looks like a bug in linter + for _, _ = range s.config.DataNodeAssistant.Iterator(s.config.State.Node(volatileAddress)) { + dataItems++ + } + case types.StatePointer: + stack = append(stack, volatileAddress.Naked()) + levels[volatileAddress.Naked()] = level + } + } + } +} + func (s *Space[K, V]) iterate(pointer *types.Pointer, yield func(item *types.DataItem[K, V]) bool) { volatileAddress := types.Load(&pointer.VolatileAddress) switch volatileAddress.State() { @@ -222,6 +309,7 @@ func (s *Space[K, V]) initEntry( ) error { initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize) copy(initBytes, s.defaultInit) + v.snapshotID = snapshotID v.keyHash = keyHash v.item.Key = key v.stage = stage @@ -248,7 +336,6 @@ func (s *Space[K, V]) initEntry( func (s *Space[K, V]) keyExists( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -258,7 +345,7 @@ func (s *Space[K, V]) keyExists( ) (bool, error) { detectUpdate(v) - if err := s.find(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { + if err := s.find(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { return false, err } @@ -267,7 +354,6 @@ func (s *Space[K, V]) keyExists( func (s *Space[K, V]) readKey( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -277,7 +363,7 @@ func (s *Space[K, V]) readKey( ) (V, error) { detectUpdate(v) - if err := s.find(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { + if err := s.find(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { return s.defaultValue, err } @@ -290,7 +376,6 @@ func (s *Space[K, V]) readKey( func (s *Space[K, V]) deleteKey( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -300,7 +385,7 @@ func (s *Space[K, V]) deleteKey( ) error { detectUpdate(v) - if err := s.find(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { + if err := s.find(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc); err != nil { return err } @@ -317,6 +402,8 @@ func (s *Space[K, V]) deleteKey( return err } + v.exists = false + tx.AddStoreRequest(&v.storeRequest) } @@ -325,7 +412,6 @@ func (s *Space[K, V]) deleteKey( func (s *Space[K, V]) setKey( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -338,12 +424,11 @@ func (s *Space[K, V]) setKey( detectUpdate(v) - return s.set(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) + return s.set(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) } func (s *Space[K, V]) find( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -351,7 +436,7 @@ func (s *Space[K, V]) find( hashMatches []uint64, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) error { - if err := s.walkPointers(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashKeyFunc); err != nil { + if err := s.walkPointers(v, tx, walRecorder, allocator, hashBuff, hashKeyFunc); err != nil { return err } @@ -381,7 +466,6 @@ func (s *Space[K, V]) find( func (s *Space[K, V]) set( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -389,7 +473,7 @@ func (s *Space[K, V]) set( hashMatches []uint64, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) error { - if err := s.walkPointers(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashKeyFunc); err != nil { + if err := s.walkPointers(v, tx, walRecorder, allocator, hashBuff, hashKeyFunc); err != nil { return err } @@ -412,7 +496,7 @@ func (s *Space[K, V]) set( if v.storeRequest.PointersToStore > 1 { if err := wal.Set2(walRecorder, tx, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer.PersistentAddress, - &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID, &snapshotID, + &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID, &v.snapshotID, &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.PersistentAddress, &dataNodePersistentAddress, ); err != nil { return err @@ -425,7 +509,7 @@ func (s *Space[K, V]) set( return err } } else { - v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID = snapshotID + v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.SnapshotID = v.snapshotID v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.PersistentAddress = dataNodePersistentAddress types.Store(&v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress, dataNodeVolatileAddress) } @@ -451,6 +535,8 @@ func (s *Space[K, V]) set( return err } + v.exists = true + tx.AddStoreRequest(&v.storeRequest) return nil @@ -458,7 +544,7 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { - splitDone, err := s.splitDataNode(snapshotID, tx, walRecorder, allocator, v.parentIndex, + splitDone, err := s.splitDataNode(v.snapshotID, tx, walRecorder, allocator, v.parentIndex, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) if err != nil { return err @@ -468,16 +554,16 @@ func (s *Space[K, V]) set( v.storeRequest.PointersToStore-- v.level-- - return s.set(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) + return s.set(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) } } // Add pointer node. - if err := s.addPointerNode(v, snapshotID, tx, walRecorder, allocator, conflict); err != nil { + if err := s.addPointerNode(v, tx, walRecorder, allocator, conflict); err != nil { return err } - return s.set(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) + return s.set(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKeyFunc) } func (s *Space[K, V]) splitToIndex(parentNodeAddress types.NodeAddress, index uint64) (uint64, uint64) { @@ -596,7 +682,6 @@ func (s *Space[K, V]) splitDataNode( func (s *Space[K, V]) addPointerNode( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -655,7 +740,7 @@ func (s *Space[K, V]) addPointerNode( types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) } - _, err = s.splitDataNode(snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, v.level+1) + _, err = s.splitDataNode(v.snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, v.level+1) return err } @@ -728,7 +813,6 @@ var pointerHops = [NumOfPointers][]uint64{ func (s *Space[K, V]) walkPointers( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -736,7 +820,7 @@ func (s *Space[K, V]) walkPointers( hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) error { for { - more, err := s.walkOnePointer(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashKeyFunc) + more, err := s.walkOnePointer(v, tx, walRecorder, allocator, hashBuff, hashKeyFunc) if err != nil || !more { return err } @@ -745,7 +829,6 @@ func (s *Space[K, V]) walkPointers( func (s *Space[K, V]) walkOnePointer( v *Entry[K, V], - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -808,7 +891,7 @@ func (s *Space[K, V]) walkOnePointer( //nolint:nestif if types.Load(&pointerNode.Pointers[index].VolatileAddress) != types.FreeAddress && - pointerNode.Pointers[index].SnapshotID != snapshotID { + pointerNode.Pointers[index].SnapshotID != v.snapshotID { persistentAddress, err := allocator.Allocate() if err != nil { return false, err @@ -822,13 +905,13 @@ func (s *Space[K, V]) walkOnePointer( if v.storeRequest.PointersToStore > 0 { if err := wal.Set2(walRecorder, tx, v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.PersistentAddress, - &pointerNode.Pointers[index].SnapshotID, &snapshotID, + &pointerNode.Pointers[index].SnapshotID, &v.snapshotID, &pointerNode.Pointers[index].PersistentAddress, &persistentAddress, ); err != nil { return false, err } } else { - pointerNode.Pointers[index].SnapshotID = snapshotID + pointerNode.Pointers[index].SnapshotID = v.snapshotID pointerNode.Pointers[index].PersistentAddress = persistentAddress } } @@ -895,6 +978,7 @@ type Entry[K, V comparable] struct { nextDataNode *types.NodeAddress storeRequest pipeline.StoreRequest + snapshotID types.SnapshotID itemP *types.DataItem[K, V] keyHashP *types.KeyHash keyHash types.KeyHash @@ -908,14 +992,13 @@ type Entry[K, V comparable] struct { // Value returns the value from entry. func (v *Entry[K, V]) Value( - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, hashBuff []byte, hashMatches []uint64, ) (V, error) { - return v.space.readKey(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) + return v.space.readKey(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) } // Key returns the key from entry. @@ -925,19 +1008,17 @@ func (v *Entry[K, V]) Key() K { // Exists returns true if entry exists in the space. func (v *Entry[K, V]) Exists( - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, hashBuff []byte, hashMatches []uint64, ) (bool, error) { - return v.space.keyExists(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) + return v.space.keyExists(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) } // Set sts value for entry. func (v *Entry[K, V]) Set( - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, @@ -945,19 +1026,18 @@ func (v *Entry[K, V]) Set( hashBuff []byte, hashMatches []uint64, ) error { - return v.space.setKey(v, snapshotID, tx, walRecorder, allocator, value, hashBuff, hashMatches, hashKey[K]) + return v.space.setKey(v, tx, walRecorder, allocator, value, hashBuff, hashMatches, hashKey[K]) } // Delete deletes the entry. func (v *Entry[K, V]) Delete( - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, hashBuff []byte, hashMatches []uint64, ) error { - return v.space.deleteKey(v, snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) + return v.space.deleteKey(v, tx, walRecorder, allocator, hashBuff, hashMatches, hashKey[K]) } func hashKey[K comparable](key *K, buff []byte, level uint8) types.KeyHash { @@ -979,6 +1059,8 @@ func hashKey[K comparable](key *K, buff []byte, level uint8) types.KeyHash { } func detectUpdate[K, V comparable](v *Entry[K, V]) { + v.stage = StageData + switch { case *v.nextDataNode != types.FreeAddress: v.storeRequest.PointersToStore-- diff --git a/space/test.go b/space/test.go index 809e913..65969ec 100644 --- a/space/test.go +++ b/space/test.go @@ -1,7 +1,7 @@ package space import ( - "sort" + "github.com/stretchr/testify/require" "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" @@ -9,19 +9,38 @@ import ( "github.com/outofforest/quantum/wal" ) +// TestKey represents key with explicit hash used in tests. +type TestKey[K comparable] struct { + Key K + KeyHash types.KeyHash +} + // NewSpaceTest creates new wrapper for space testing. func NewSpaceTest[K, V comparable]( - s *Space[K, V], - tx *pipeline.TransactionRequest, - walRecorder *wal.Recorder, - allocator *alloc.Allocator, + t require.TestingT, + state *alloc.State, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) *SpaceTest[K, V] { + dataNodeAssistant, err := NewDataNodeAssistant[K, V]() + require.NoError(t, err) + + s := New[K, V](Config[K, V]{ + SpaceRoot: types.NodeRoot{ + Pointer: &types.Pointer{}, + Hash: &types.Hash{}, + }, + State: state, + DataNodeAssistant: dataNodeAssistant, + NoSnapshots: false, + }) + + txFactory := pipeline.NewTransactionRequestFactory() + return &SpaceTest[K, V]{ s: s, - tx: tx, - walRecorder: walRecorder, - allocator: allocator, + tx: txFactory.New(), + walRecorder: wal.NewRecorder(state, state.NewAllocator()), + allocator: state.NewAllocator(), hashKeyFunc: hashKeyFunc, hashBuff: s.NewHashBuff(), hashMatches: s.NewHashMatches(), @@ -44,35 +63,34 @@ type SpaceTest[K, V comparable] struct { // NewEntry initializes new entry. func (s *SpaceTest[K, V]) NewEntry( snapshotID types.SnapshotID, - key K, - keyHash types.KeyHash, + key TestKey[K], stage uint8, ) (*Entry[K, V], error) { v := &Entry[K, V]{} - if err := s.s.initEntry(v, snapshotID, s.tx, s.walRecorder, s.allocator, key, keyHash, stage); err != nil { + if err := s.s.initEntry(v, snapshotID, s.tx, s.walRecorder, s.allocator, key.Key, key.KeyHash, stage); err != nil { return nil, err } return v, nil } // KeyExists checks if key is set in the space. -func (s *SpaceTest[K, V]) KeyExists(v *Entry[K, V], snapshotID types.SnapshotID) (bool, error) { - return s.s.keyExists(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) +func (s *SpaceTest[K, V]) KeyExists(v *Entry[K, V]) (bool, error) { + return s.s.keyExists(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) } // ReadKey reads value for the key. -func (s *SpaceTest[K, V]) ReadKey(v *Entry[K, V], snapshotID types.SnapshotID) (V, error) { - return s.s.readKey(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) +func (s *SpaceTest[K, V]) ReadKey(v *Entry[K, V]) (V, error) { + return s.s.readKey(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) } // DeleteKey deletes key from space. -func (s *SpaceTest[K, V]) DeleteKey(v *Entry[K, V], snapshotID types.SnapshotID) error { - return s.s.deleteKey(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) +func (s *SpaceTest[K, V]) DeleteKey(v *Entry[K, V]) error { + return s.s.deleteKey(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) } // SetKey sets value for the key. -func (s *SpaceTest[K, V]) SetKey(v *Entry[K, V], snapshotID types.SnapshotID, value V) error { - return s.s.setKey(v, snapshotID, s.tx, s.walRecorder, s.allocator, value, s.hashBuff, s.hashMatches, s.hashKeyFunc) +func (s *SpaceTest[K, V]) SetKey(v *Entry[K, V], value V) error { + return s.s.setKey(v, s.tx, s.walRecorder, s.allocator, value, s.hashBuff, s.hashMatches, s.hashKeyFunc) } // SplitDataNode splits data node. @@ -83,18 +101,18 @@ func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], snapshotID types.Snapsho } // AddPointerNode adds pointer node. -func (s *SpaceTest[K, V]) AddPointerNode(v *Entry[K, V], snapshotID types.SnapshotID, conflict bool) error { - return s.s.addPointerNode(v, snapshotID, s.tx, s.walRecorder, s.allocator, conflict) +func (s *SpaceTest[K, V]) AddPointerNode(v *Entry[K, V], conflict bool) error { + return s.s.addPointerNode(v, s.tx, s.walRecorder, s.allocator, conflict) } // WalkPointers walk all the pointers to find the key. -func (s *SpaceTest[K, V]) WalkPointers(v *Entry[K, V], snapshotID types.SnapshotID) error { - return s.s.walkPointers(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashKeyFunc) +func (s *SpaceTest[K, V]) WalkPointers(v *Entry[K, V]) error { + return s.s.walkPointers(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashKeyFunc) } // WalkOnePointer walks one pointer only. -func (s *SpaceTest[K, V]) WalkOnePointer(v *Entry[K, V], snapshotID types.SnapshotID) (bool, error) { - return s.s.walkOnePointer(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashKeyFunc) +func (s *SpaceTest[K, V]) WalkOnePointer(v *Entry[K, V]) (bool, error) { + return s.s.walkOnePointer(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashKeyFunc) } // WalkDataItems walks items in data node to find position for the key. @@ -103,99 +121,11 @@ func (s *SpaceTest[K, V]) WalkDataItems(v *Entry[K, V]) bool { } // Query queries the space for a key. -func (s *SpaceTest[K, V]) Query(key K, keyHash types.KeyHash) (V, bool) { - return s.s.query(key, keyHash, s.hashBuff, s.hashMatches, s.hashKeyFunc) +func (s *SpaceTest[K, V]) Query(key TestKey[K]) (V, bool) { + return s.s.query(key.Key, key.KeyHash, s.hashBuff, s.hashMatches, s.hashKeyFunc) } // Find finds the location in the tree for key. -func (s *SpaceTest[K, V]) Find(v *Entry[K, V], snapshotID types.SnapshotID) error { - return s.s.find(v, snapshotID, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) -} - -// Nodes returns the list of nodes allocated by the tree. -func (s *SpaceTest[K, V]) Nodes() []types.NodeAddress { - switch s.s.config.SpaceRoot.Pointer.VolatileAddress.State() { - case types.StateFree: - return nil - case types.StateData: - return []types.NodeAddress{s.s.config.SpaceRoot.Pointer.VolatileAddress} - } - - nodes := []types.NodeAddress{} - stack := []types.NodeAddress{s.s.config.SpaceRoot.Pointer.VolatileAddress.Naked()} - - for { - if len(stack) == 0 { - sort.Slice(nodes, func(i, j int) bool { - return nodes[i] < nodes[j] - }) - - return nodes - } - - pointerNodeAddress := stack[len(stack)-1] - stack = stack[:len(stack)-1] - nodes = append(nodes, pointerNodeAddress) - - pointerNode := ProjectPointerNode(s.s.config.State.Node(pointerNodeAddress.Naked())) - for pi := range pointerNode.Pointers { - switch pointerNode.Pointers[pi].VolatileAddress.State() { - case types.StateFree: - case types.StateData: - nodes = append(nodes, pointerNode.Pointers[pi].VolatileAddress) - case types.StatePointer: - stack = append(stack, pointerNode.Pointers[pi].VolatileAddress.Naked()) - } - } - } -} - -// Stats returns space-related statistics. -func (s *SpaceTest[K, V]) Stats() (uint64, uint64, uint64, float64) { - switch s.s.config.SpaceRoot.Pointer.VolatileAddress.State() { - case types.StateFree: - return 0, 0, 0, 0 - case types.StateData: - return 1, 0, 1, 0 - } - - stack := []types.NodeAddress{s.s.config.SpaceRoot.Pointer.VolatileAddress.Naked()} - - levels := map[types.NodeAddress]uint64{ - s.s.config.SpaceRoot.Pointer.VolatileAddress.Naked(): 1, - } - var maxLevel, pointerNodes, dataNodes, dataItems uint64 - - for { - if len(stack) == 0 { - return maxLevel, pointerNodes, dataNodes, float64(dataItems) / float64(dataNodes*s.s.numOfDataItems) - } - - n := stack[len(stack)-1] - level := levels[n] + 1 - pointerNodes++ - stack = stack[:len(stack)-1] - - pointerNode := ProjectPointerNode(s.s.config.State.Node(n.Naked())) - for pi := range pointerNode.Pointers { - volatileAddress := types.Load(&pointerNode.Pointers[pi].VolatileAddress) - switch volatileAddress.State() { - case types.StateFree: - case types.StateData: - dataNodes++ - if level > maxLevel { - maxLevel = level - } - //nolint:gofmt,revive // looks like a bug in linter - for _, _ = range s.s.config.DataNodeAssistant.Iterator(s.s.config.State.Node( - volatileAddress, - )) { - dataItems++ - } - case types.StatePointer: - stack = append(stack, volatileAddress.Naked()) - levels[volatileAddress.Naked()] = level - } - } - } +func (s *SpaceTest[K, V]) Find(v *Entry[K, V]) error { + return s.s.find(v, s.tx, s.walRecorder, s.allocator, s.hashBuff, s.hashMatches, s.hashKeyFunc) } diff --git a/tx/genesis/genesis.go b/tx/genesis/genesis.go index cbbfd46..e6dbe16 100644 --- a/tx/genesis/genesis.go +++ b/tx/genesis/genesis.go @@ -38,7 +38,6 @@ func (t *Tx) Execute( } if err := v.Set( - snapshotID, tx, walRecorder, allocator, diff --git a/tx/transfer/transfer.go b/tx/transfer/transfer.go index ac9be9b..a8d7b73 100644 --- a/tx/transfer/transfer.go +++ b/tx/transfer/transfer.go @@ -42,14 +42,13 @@ func (t *Tx) Prepare( // Execute executes transaction. func (t *Tx) Execute( - snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, hashBuff []byte, hashMatches []uint64, ) error { - fromBalance, err := t.from.Value(snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches) + fromBalance, err := t.from.Value(tx, walRecorder, allocator, hashBuff, hashMatches) if err != nil { return err } @@ -57,7 +56,7 @@ func (t *Tx) Execute( return errors.Errorf("sender's balance is too low, balance: %d, amount to send: %d", fromBalance, t.Amount) } - toBalance, err := t.to.Value(snapshotID, tx, walRecorder, allocator, hashBuff, hashMatches) + toBalance, err := t.to.Value(tx, walRecorder, allocator, hashBuff, hashMatches) if err != nil { return err } @@ -68,7 +67,6 @@ func (t *Tx) Execute( } if err := t.from.Set( - snapshotID, tx, walRecorder, allocator, @@ -80,7 +78,6 @@ func (t *Tx) Execute( } return t.to.Set( - snapshotID, tx, walRecorder, allocator,