From bd31e9c37c7d0a748e981a69935a951b3d478367 Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Wed, 4 Dec 2024 21:47:33 +0100 Subject: [PATCH] Test adding new parent node and resolving conflicts --- space/logic_test.go | 607 +++++++++++++++++++++++++++++++++++ space/no_concurrency_test.go | 357 -------------------- space/space.go | 119 ++++++- space/test.go | 14 +- 4 files changed, 732 insertions(+), 365 deletions(-) create mode 100644 space/logic_test.go delete mode 100644 space/no_concurrency_test.go diff --git a/space/logic_test.go b/space/logic_test.go new file mode 100644 index 0000000..a7a7aa1 --- /dev/null +++ b/space/logic_test.go @@ -0,0 +1,607 @@ +// Github actions run on machines not supporting AVX-512 instructions. +//go:build nogithub + +package space + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/outofforest/quantum/alloc" + txtypes "github.com/outofforest/quantum/tx/types" + "github.com/outofforest/quantum/types" +) + +const ( + nodesPerGroup = 100 + stateSize = 100 * nodesPerGroup * types.NodeLength +) + +// TestCRUDOnRootDataNode tests basic CRUD operations using one data item on single data node being the root +// of the space. +func TestCRUDOnRootDataNode(t *testing.T) { + requireT := require.New(t) + + var ( + snapshotID types.SnapshotID = 1 + + account = TestKey[txtypes.Account]{ + Key: txtypes.Account{0x01}, + KeyHash: 1, + } + amount txtypes.Amount = 100 + ) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) + + // Read non-existing. + + balance, exists := s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) + + v, err := s.NewEntry(snapshotID, account, StageData) + requireT.NoError(err) + + exists, err = s.KeyExists(v) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + // Create. + + requireT.NoError(s.SetKey(v, amount)) + + // Read existing. + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount, balance) + + exists, err = s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + v2, err := s.NewEntry(snapshotID, account, StageData) + requireT.NoError(err) + + exists, err = s.KeyExists(v2) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(amount, balance) + + v3, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v3) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v3) + requireT.NoError(err) + requireT.Equal(amount, balance) + + // Update 1. + + requireT.NoError(s.SetKey(v3, amount+1)) + + exists, err = s.KeyExists(v3) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v3) + requireT.NoError(err) + requireT.Equal(amount+1, balance) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount+1, balance) + + v4, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v4) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v4) + requireT.NoError(err) + requireT.Equal(amount+1, balance) + + // Update 2. + + requireT.NoError(s.SetKey(v4, amount+2)) + + exists, err = s.KeyExists(v4) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v4) + requireT.NoError(err) + requireT.Equal(amount+2, balance) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount+2, balance) + + v5, err := s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + exists, err = s.KeyExists(v5) + requireT.NoError(err) + requireT.True(exists) + + balance, err = s.ReadKey(v5) + requireT.NoError(err) + requireT.Equal(amount+2, balance) + + // Delete 1. + + requireT.NoError(s.DeleteKey(v5)) + + exists, err = s.KeyExists(v5) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v5) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + balance, exists = s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) + + // Recreate. + + v, err = s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + requireT.NoError(s.SetKey(v, amount)) + + balance, exists = s.Query(account) + requireT.True(exists) + requireT.Equal(amount, balance) + + // Delete 2. + + v2, err = s.NewEntry(snapshotID, account, StagePointer0) + requireT.NoError(err) + + requireT.NoError(s.DeleteKey(v2)) + + exists, err = s.KeyExists(v2) + requireT.NoError(err) + requireT.False(exists) + + balance, err = s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + balance, exists = s.Query(account) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) +} + +// TestSetConflictingHashesOnRootDataNode sets many keys using the same hash to verify that they are not overwritten +// by each other. +func TestSetConflictingHashesOnRootDataNode(t *testing.T) { + const ( + numOfItems = 50 + snapshotID types.SnapshotID = 1 + keyHash types.KeyHash = 1 // Same key hash is intentionally used for all the items to test conflicts. + ) + + requireT := require.New(t) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) + + // Create. + + for i := range uint8(numOfItems) { + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + }, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.SetKey(v, txtypes.Amount(i))) + } + + // Verify items exist. + + for i := range uint8(numOfItems) { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + } + + // Update every second item. + + for i := uint8(0); i < numOfItems; i += 2 { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(10 * i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.SetKey(v, amount)) + + v2, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v2) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + } + + // Verify all the other items stay untouched. + + for i := uint8(1); i < numOfItems; i += 2 { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + } + + // Delete every second item. + + for i := uint8(0); i < numOfItems; i += 2 { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.DeleteKey(v)) + + v2, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v2) + requireT.NoError(err) + requireT.False(exists) + + balance, err := s.ReadKey(v2) + requireT.NoError(err) + requireT.Equal(txtypes.Amount(0), balance) + + balance, exists = s.Query(key) + requireT.False(exists) + requireT.Equal(txtypes.Amount(0), balance) + } + + // Verify all the other items still exist. + + for i := uint8(1); i < numOfItems; i += 2 { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + } +} + +// TestAddingParentNodeWithoutConflictResolution verifies that all the data items with the same key hash stay +// in the same data node without key hash recalculation, if space is instructed to add new parent node without +// conflict resolution. +func TestAddingParentNodeWithoutConflictResolution(t *testing.T) { + const ( + // It is selected this way to be sure that nothing is moved to the next data node. + numOfItems = NumOfPointers + snapshotID types.SnapshotID = 1 + keyHash types.KeyHash = 1 // Same key hash is intentionally used for all the items. + ) + + requireT := require.New(t) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) + + // Create. + + for i := range uint8(numOfItems - 1) { + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + }, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.SetKey(v, txtypes.Amount(i))) + requireT.Equal(uint8(0), v.level) + } + + // Add parent node and set the last item. + + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{numOfItems - 1}, + KeyHash: keyHash, + }, StageData) + requireT.NoError(err) + requireT.NoError(s.Find(v)) + + // Store the address of the data node to be sure that no items have been moved. + dataNodeAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + + requireT.NoError(s.AddPointerNode(v, false)) + requireT.NoError(s.SetKey(v, txtypes.Amount(numOfItems-1))) + requireT.Equal(uint8(1), v.level) + + // Verify that all the items has been moved correctly without recomputing hashes. + + for i := range uint8(numOfItems) { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + + requireT.Equal(uint8(1), v.level) + requireT.Equal(keyHash, v.keyHash) + requireT.Equal(keyHash, *v.keyHashP) + requireT.Equal(dataNodeAddress, v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress) + } +} + +// TestAddingParentNodeWithConflictResolution verifies that key hashes are recomputed and data items are redistributed +// if space is instructed to add new parent node with conflict resolution. +func TestAddingParentNodeWithConflictResolution(t *testing.T) { + const ( + // It is selected this way so half of the items is moved to another data node. + numOfItems = NumOfPointers + snapshotID types.SnapshotID = 1 + keyHash types.KeyHash = 1 // Same key hash is intentionally used for all the items. + ) + + hashKeyFunc := func(key *txtypes.Account, buff []byte, level uint8) types.KeyHash { + return types.KeyHash(key[0]) + 1 // +1 to avoid 0 + } + + requireT := require.New(t) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKeyFunc) + + // Create. + + for i := range uint8(numOfItems - 1) { + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + }, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.SetKey(v, txtypes.Amount(i))) + requireT.Equal(uint8(0), v.level) + } + + // Add parent node and set the last item. + + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{numOfItems - 1}, + KeyHash: keyHash, + }, StageData) + requireT.NoError(err) + requireT.NoError(s.Find(v)) + + // Store the address of the data node to be sure that half of the items is moved to another data node. + dataNodeAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + + requireT.NoError(s.AddPointerNode(v, true)) + requireT.NoError(s.SetKey(v, txtypes.Amount(numOfItems-1))) + requireT.Equal(uint8(1), v.level) + + // Verify that all the items has been moved correctly without recomputing hashes. + + dataNodes := map[types.NodeAddress]uint64{} + + for i := range uint8(numOfItems) { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: keyHash, + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + + requireT.Equal(uint8(1), v.level) + requireT.Equal(types.KeyHash(i+1), v.keyHash) + requireT.Equal(types.KeyHash(i+1), *v.keyHashP) + + dataNodes[v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress]++ + } + + requireT.Len(dataNodes, 2) + for _, n := range dataNodes { + requireT.Equal(uint64(numOfItems/2), n) + } + requireT.Equal(uint64(numOfItems/2), dataNodes[dataNodeAddress]) +} + +// TestAddingParentNodeForNonConflictingDataItems verifies that key hashes are not recomputed if there is no conflict +// and data items are redistributed if space is instructed to add new parent node without conflict resolution. +func TestAddingParentNodeForNonConflictingDataItems(t *testing.T) { + const ( + // It is selected this way so half of the items is moved to another data node. + numOfItems = NumOfPointers + snapshotID types.SnapshotID = 1 + ) + + requireT := require.New(t) + + state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) + requireT.NoError(err) + + s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) + + // Create. + + for i := range uint8(numOfItems - 1) { + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: types.KeyHash(i + 1), // +1 to avoid 0 + }, StagePointer0) + requireT.NoError(err) + requireT.NoError(s.SetKey(v, txtypes.Amount(i))) + requireT.Equal(uint8(0), v.level) + } + + // Add parent node and set the last item. + + v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ + Key: txtypes.Account{numOfItems - 1}, + KeyHash: types.KeyHash(numOfItems), + }, StageData) + requireT.NoError(err) + requireT.NoError(s.Find(v)) + + // Store the address of the data node to be sure that half of the items is moved to another data node. + dataNodeAddress := v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress + + requireT.NoError(s.AddPointerNode(v, false)) + requireT.NoError(s.SetKey(v, txtypes.Amount(numOfItems-1))) + requireT.Equal(uint8(1), v.level) + + // Verify that all the items has been moved correctly without recomputing hashes. + + dataNodes := map[types.NodeAddress]uint64{} + + for i := range uint8(numOfItems) { + key := TestKey[txtypes.Account]{ + Key: txtypes.Account{i}, + KeyHash: types.KeyHash(i + 1), // +1 to avoid 0 + } + amount := txtypes.Amount(i) + + v, err := s.NewEntry(snapshotID, key, StagePointer0) + requireT.NoError(err) + + exists, err := s.KeyExists(v) + requireT.NoError(err) + requireT.True(exists) + + balance, err := s.ReadKey(v) + requireT.NoError(err) + requireT.Equal(amount, balance) + + balance, exists = s.Query(key) + requireT.True(exists) + requireT.Equal(amount, balance) + + requireT.Equal(uint8(1), v.level) + requireT.Equal(types.KeyHash(i+1), v.keyHash) + requireT.Equal(types.KeyHash(i+1), *v.keyHashP) + + dataNodes[v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress]++ + } + + requireT.Len(dataNodes, 2) + for _, n := range dataNodes { + requireT.Equal(uint64(numOfItems/2), n) + } + requireT.Equal(uint64(numOfItems/2), dataNodes[dataNodeAddress]) +} diff --git a/space/no_concurrency_test.go b/space/no_concurrency_test.go deleted file mode 100644 index 9df64a2..0000000 --- a/space/no_concurrency_test.go +++ /dev/null @@ -1,357 +0,0 @@ -// Github actions run on machines not supporting AVX-512 instructions. -//go:build nogithub - -package space - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/outofforest/quantum/alloc" - txtypes "github.com/outofforest/quantum/tx/types" - "github.com/outofforest/quantum/types" -) - -const ( - nodesPerGroup = 100 - stateSize = 100 * nodesPerGroup * types.NodeLength -) - -// TestCRUDOnRootDataNode tests basic CRUD operations using one data item on single data node being the root -// of the space. -func TestCRUDOnRootDataNode(t *testing.T) { - requireT := require.New(t) - - var ( - snapshotID types.SnapshotID = 1 - - account = TestKey[txtypes.Account]{ - Key: txtypes.Account{0x01}, - KeyHash: 1, - } - amount txtypes.Amount = 100 - ) - - state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) - requireT.NoError(err) - - s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) - - // Read non-existing - - balance, exists := s.Query(account) - requireT.False(exists) - requireT.Equal(txtypes.Amount(0), balance) - - v, err := s.NewEntry(snapshotID, account, StageData) - requireT.NoError(err) - - exists, err = s.KeyExists(v) - requireT.NoError(err) - requireT.False(exists) - - balance, err = s.ReadKey(v) - requireT.NoError(err) - requireT.Equal(txtypes.Amount(0), balance) - - // Create - - requireT.NoError(s.SetKey(v, amount)) - - // Read existing - - balance, exists = s.Query(account) - requireT.True(exists) - requireT.Equal(amount, balance) - - exists, err = s.KeyExists(v) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v) - requireT.NoError(err) - requireT.Equal(amount, balance) - - v2, err := s.NewEntry(snapshotID, account, StageData) - requireT.NoError(err) - - exists, err = s.KeyExists(v2) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v2) - requireT.NoError(err) - requireT.Equal(amount, balance) - - v3, err := s.NewEntry(snapshotID, account, StagePointer0) - requireT.NoError(err) - - exists, err = s.KeyExists(v3) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v3) - requireT.NoError(err) - requireT.Equal(amount, balance) - - // Update 1 - - requireT.NoError(s.SetKey(v3, amount+1)) - - exists, err = s.KeyExists(v3) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v3) - requireT.NoError(err) - requireT.Equal(amount+1, balance) - - balance, exists = s.Query(account) - requireT.True(exists) - requireT.Equal(amount+1, balance) - - v4, err := s.NewEntry(snapshotID, account, StagePointer0) - requireT.NoError(err) - - exists, err = s.KeyExists(v4) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v4) - requireT.NoError(err) - requireT.Equal(amount+1, balance) - - // Update 2 - - requireT.NoError(s.SetKey(v4, amount+2)) - - exists, err = s.KeyExists(v4) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v4) - requireT.NoError(err) - requireT.Equal(amount+2, balance) - - balance, exists = s.Query(account) - requireT.True(exists) - requireT.Equal(amount+2, balance) - - v5, err := s.NewEntry(snapshotID, account, StagePointer0) - requireT.NoError(err) - - exists, err = s.KeyExists(v5) - requireT.NoError(err) - requireT.True(exists) - - balance, err = s.ReadKey(v5) - requireT.NoError(err) - requireT.Equal(amount+2, balance) - - // Delete 1 - - requireT.NoError(s.DeleteKey(v5)) - - exists, err = s.KeyExists(v5) - requireT.NoError(err) - requireT.False(exists) - - balance, err = s.ReadKey(v5) - requireT.NoError(err) - requireT.Equal(txtypes.Amount(0), balance) - - balance, exists = s.Query(account) - requireT.False(exists) - requireT.Equal(txtypes.Amount(0), balance) - - // Recreate - - v, err = s.NewEntry(snapshotID, account, StagePointer0) - requireT.NoError(err) - - requireT.NoError(s.SetKey(v, amount)) - - balance, exists = s.Query(account) - requireT.True(exists) - requireT.Equal(amount, balance) - - // Delete 2 - - v2, err = s.NewEntry(snapshotID, account, StagePointer0) - requireT.NoError(err) - - requireT.NoError(s.DeleteKey(v2)) - - exists, err = s.KeyExists(v2) - requireT.NoError(err) - requireT.False(exists) - - balance, err = s.ReadKey(v2) - requireT.NoError(err) - requireT.Equal(txtypes.Amount(0), balance) - - balance, exists = s.Query(account) - requireT.False(exists) - requireT.Equal(txtypes.Amount(0), balance) -} - -// TestSetConflictingHashesOnRootDataNode sets many keys using the same hash to verify that they are not overwritten -// by each other. -func TestSetConflictingHashesOnRootDataNode(t *testing.T) { - const ( - numOfItems = 50 - snapshotID types.SnapshotID = 1 - keyHash types.KeyHash = 1 // Same key hash is intentionally used for all the items to test conflicts. - ) - - requireT := require.New(t) - - state, err := alloc.RunInTest(t, stateSize, nodesPerGroup) - requireT.NoError(err) - - s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) - - // Create - - for i := range uint8(numOfItems) { - v, err := s.NewEntry(snapshotID, TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - }, StagePointer0) - requireT.NoError(err) - requireT.NoError(s.SetKey(v, txtypes.Amount(i))) - } - - // Verify items exist. - - for i := range uint8(numOfItems) { - key := TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - } - amount := txtypes.Amount(i) - - v, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - - exists, err := s.KeyExists(v) - requireT.NoError(err) - requireT.True(exists) - - balance, err := s.ReadKey(v) - requireT.NoError(err) - requireT.Equal(amount, balance) - - balance, exists = s.Query(key) - requireT.True(exists) - requireT.Equal(amount, balance) - } - - // Update every second item. - - for i := uint8(0); i < numOfItems; i += 2 { - key := TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - } - amount := txtypes.Amount(10 * i) - - v, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - requireT.NoError(s.SetKey(v, amount)) - - v2, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - - exists, err := s.KeyExists(v2) - requireT.NoError(err) - requireT.True(exists) - - balance, err := s.ReadKey(v2) - requireT.NoError(err) - requireT.Equal(amount, balance) - - balance, exists = s.Query(key) - requireT.True(exists) - requireT.Equal(amount, balance) - } - - // Verify all the other items stay untouched. - - for i := uint8(1); i < numOfItems; i += 2 { - key := TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - } - amount := txtypes.Amount(i) - - v, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - - exists, err := s.KeyExists(v) - requireT.NoError(err) - requireT.True(exists) - - balance, err := s.ReadKey(v) - requireT.NoError(err) - requireT.Equal(amount, balance) - - balance, exists = s.Query(key) - requireT.True(exists) - requireT.Equal(amount, balance) - } - - // Delete every second item. - - for i := uint8(0); i < numOfItems; i += 2 { - key := TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - } - - v, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - requireT.NoError(s.DeleteKey(v)) - - v2, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - - exists, err := s.KeyExists(v2) - requireT.NoError(err) - requireT.False(exists) - - balance, err := s.ReadKey(v2) - requireT.NoError(err) - requireT.Equal(txtypes.Amount(0), balance) - - balance, exists = s.Query(key) - requireT.False(exists) - requireT.Equal(txtypes.Amount(0), balance) - } - - // Verify all the other items still exist. - - for i := uint8(1); i < numOfItems; i += 2 { - key := TestKey[txtypes.Account]{ - Key: txtypes.Account{i}, - KeyHash: keyHash, - } - amount := txtypes.Amount(i) - - v, err := s.NewEntry(snapshotID, key, StagePointer0) - requireT.NoError(err) - - exists, err := s.KeyExists(v) - requireT.NoError(err) - requireT.True(exists) - - balance, err := s.ReadKey(v) - requireT.NoError(err) - requireT.Equal(amount, balance) - - balance, exists = s.Query(key) - requireT.True(exists) - requireT.Equal(amount, balance) - } -} diff --git a/space/space.go b/space/space.go index 6c680f0..c23708f 100644 --- a/space/space.go +++ b/space/space.go @@ -544,7 +544,7 @@ func (s *Space[K, V]) set( // Try to split data node. if v.storeRequest.PointersToStore > 1 { - splitDone, err := s.splitDataNode(v.snapshotID, tx, walRecorder, allocator, v.parentIndex, + splitDone, err := s.splitDataNodeWithoutConflict(v.snapshotID, tx, walRecorder, allocator, v.parentIndex, v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) if err != nil { return err @@ -559,7 +559,7 @@ func (s *Space[K, V]) set( } // Add pointer node. - if err := s.addPointerNode(v, tx, walRecorder, allocator, conflict); err != nil { + if err := s.addPointerNode(v, tx, walRecorder, allocator, conflict, hashBuff, hashKeyFunc); err != nil { return err } @@ -586,7 +586,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.NodeAddress, index ui return index, uint64(math.MaxUint64) << bits.TrailingZeros64(mask) } -func (s *Space[K, V]) splitDataNode( +func (s *Space[K, V]) splitDataNodeWithoutConflict( snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, @@ -680,12 +680,117 @@ func (s *Space[K, V]) splitDataNode( return true, nil } +func (s *Space[K, V]) splitDataNodeWithConflict( + snapshotID types.SnapshotID, + tx *pipeline.TransactionRequest, + walRecorder *wal.Recorder, + allocator *alloc.Allocator, + index uint64, + parentNodePointer *types.Pointer, + level uint8, + hashBuff []byte, + hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, +) (bool, error) { + newIndex, mask := s.splitToIndex(types.Load(&parentNodePointer.VolatileAddress), index) + if newIndex == index { + return false, nil + } + + newNodeVolatileAddress, err := allocator.Allocate() + if err != nil { + return false, err + } + s.config.State.Clear(newNodeVolatileAddress) + + newNodePersistentAddress, err := allocator.Allocate() + if err != nil { + return false, err + } + s.config.State.Clear(newNodePersistentAddress) + + parentNode := ProjectPointerNode(s.config.State.Node(types.Load(&parentNodePointer.VolatileAddress).Naked())) + existingNodePointer := &parentNode.Pointers[index] + existingDataNode := s.config.State.Node(types.Load(&existingNodePointer.VolatileAddress)) + newDataNode := s.config.State.Node(newNodeVolatileAddress) + keyHashes := s.config.DataNodeAssistant.KeyHashes(existingDataNode) + newKeyHashes := s.config.DataNodeAssistant.KeyHashes(newDataNode) + + for i, item := range s.config.DataNodeAssistant.Iterator(existingDataNode) { + keyHash := hashKeyFunc(&item.Key, hashBuff, level-1) + itemIndex := PointerIndex(keyHash, level-1) + if itemIndex&mask != newIndex { + if err := wal.Set1(walRecorder, tx, existingNodePointer.PersistentAddress, + &keyHashes[i], &keyHash, + ); err != nil { + return false, err + } + + continue + } + + newDataNodeItem := s.config.DataNodeAssistant.Item(newDataNode, s.config.DataNodeAssistant.ItemOffset(i)) + if err := wal.Copy(walRecorder, tx, newNodePersistentAddress, existingNodePointer.PersistentAddress, + newDataNodeItem, item, + ); err != nil { + return false, err + } + if err := wal.Set1(walRecorder, tx, newNodePersistentAddress, + &newKeyHashes[i], &keyHash, + ); err != nil { + return false, err + } + if err := wal.Set1(walRecorder, tx, existingNodePointer.PersistentAddress, + &keyHashes[i], zeroKeyHashPtr, + ); err != nil { + return false, err + } + } + + if err := wal.Set2(walRecorder, tx, parentNodePointer.PersistentAddress, + &parentNode.Pointers[newIndex].SnapshotID, &snapshotID, + &parentNode.Pointers[newIndex].PersistentAddress, &newNodePersistentAddress, + ); err != nil { + return false, err + } + + if err := wal.SetAddressAtomically(walRecorder, tx, parentNodePointer.PersistentAddress, + &parentNode.Pointers[newIndex].VolatileAddress, &newNodeVolatileAddress, + ); err != nil { + return false, err + } + + tx.AddStoreRequest(&pipeline.StoreRequest{ + Store: [pipeline.StoreCapacity]types.NodeRoot{ + { + Hash: &parentNode.Hashes[index], + Pointer: &parentNode.Pointers[index], + }, + }, + PointersToStore: 1, + NoSnapshots: s.config.NoSnapshots, + }) + tx.AddStoreRequest(&pipeline.StoreRequest{ + Store: [pipeline.StoreCapacity]types.NodeRoot{ + { + Hash: &parentNode.Hashes[newIndex], + Pointer: &parentNode.Pointers[newIndex], + }, + }, + PointersToStore: 1, + NoSnapshots: s.config.NoSnapshots, + }) + + return true, nil +} + func (s *Space[K, V]) addPointerNode( v *Entry[K, V], tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, conflict bool, + hashBuff []byte, + hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) error { pointerNodeVolatileAddress, err := allocator.Allocate() if err != nil { @@ -740,7 +845,13 @@ func (s *Space[K, V]) addPointerNode( types.Store(&pointerNodeRoot.Pointer.VolatileAddress, pointerNodeVolatileAddress) } - _, err = s.splitDataNode(v.snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, v.level+1) + if conflict { + _, err = s.splitDataNodeWithConflict(v.snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, + v.level+1, hashBuff, hashKeyFunc) + } else { + _, err = s.splitDataNodeWithoutConflict(v.snapshotID, tx, walRecorder, allocator, 0, pointerNodeRoot.Pointer, + v.level+1) + } return err } diff --git a/space/test.go b/space/test.go index 65969ec..f4f3c40 100644 --- a/space/test.go +++ b/space/test.go @@ -94,15 +94,21 @@ func (s *SpaceTest[K, V]) SetKey(v *Entry[K, V], value V) error { } // SplitDataNode splits data node. -func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], snapshotID types.SnapshotID) error { - _, err := s.s.splitDataNode(snapshotID, s.tx, s.walRecorder, s.allocator, v.parentIndex, - v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) +func (s *SpaceTest[K, V]) SplitDataNode(v *Entry[K, V], snapshotID types.SnapshotID, conflict bool) error { + var err error + if conflict { + _, err = s.s.splitDataNodeWithConflict(snapshotID, s.tx, s.walRecorder, s.allocator, v.parentIndex, + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level, s.hashBuff, s.hashKeyFunc) + } else { + _, err = s.s.splitDataNodeWithoutConflict(snapshotID, s.tx, s.walRecorder, s.allocator, v.parentIndex, + v.storeRequest.Store[v.storeRequest.PointersToStore-2].Pointer, v.level) + } return err } // AddPointerNode adds pointer node. func (s *SpaceTest[K, V]) AddPointerNode(v *Entry[K, V], conflict bool) error { - return s.s.addPointerNode(v, s.tx, s.walRecorder, s.allocator, conflict) + return s.s.addPointerNode(v, s.tx, s.walRecorder, s.allocator, conflict, s.hashBuff, s.hashKeyFunc) } // WalkPointers walk all the pointers to find the key.