From 37659a827f0a2eda1c06dcabeb9846a3065abb20 Mon Sep 17 00:00:00 2001 From: Wojciech Malota-Wojcik Date: Thu, 19 Dec 2024 20:57:04 +0100 Subject: [PATCH] Merge persistent writer and state --- alloc/test.go | 16 ------ benchmark_test.go | 27 ++-------- db.go | 43 +++++++--------- db_test.go | 4 +- hash/asm_test.go | 12 ++--- hash/blake3_test.go | 6 +-- list/list.go | 14 +++--- list/list_test.go | 4 +- space/compare/benchmark_test.go | 4 +- space/space.go | 20 ++++---- space/space_test.go | 72 +++++++++++++-------------- space/test.go | 6 +-- {alloc => state}/allocator.go | 2 +- {alloc => state}/allocator_test.go | 2 +- {alloc => state}/mem.go | 2 +- {alloc => state}/mem_test.go | 2 +- {alloc => state}/ring.go | 2 +- {alloc => state}/ring_test.go | 2 +- {alloc => state}/state.go | 62 ++++++++++++++--------- {alloc => state}/state_test.go | 24 ++------- state/test.go | 31 ++++++++++++ persistent/file.go => state/writer.go | 57 ++++----------------- tx/genesis/genesis.go | 4 +- tx/transfer/transfer.go | 4 +- 24 files changed, 187 insertions(+), 235 deletions(-) delete mode 100644 alloc/test.go rename {alloc => state}/allocator.go (99%) rename {alloc => state}/allocator_test.go (99%) rename {alloc => state}/mem.go (99%) rename {alloc => state}/mem_test.go (96%) rename {alloc => state}/ring.go (98%) rename {alloc => state}/ring_test.go (99%) rename {alloc => state}/state.go (71%) rename {alloc => state}/state_test.go (87%) create mode 100644 state/test.go rename persistent/file.go => state/writer.go (82%) diff --git a/alloc/test.go b/alloc/test.go deleted file mode 100644 index bccb58f..0000000 --- a/alloc/test.go +++ /dev/null @@ -1,16 +0,0 @@ -package alloc - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -// NewForTest creates state for unit tests. -func NewForTest(t *testing.T, size uint64) *State { - state, stateDeallocFunc, err := NewState(size, 2*size, false) - require.NoError(t, err) - t.Cleanup(stateDeallocFunc) - - return state -} diff --git a/benchmark_test.go b/benchmark_test.go index 224e981..9dc1781 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -10,13 +10,11 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/require" - "golang.org/x/sys/unix" "github.com/outofforest/logger" "github.com/outofforest/parallel" "github.com/outofforest/quantum" - "github.com/outofforest/quantum/alloc" - "github.com/outofforest/quantum/persistent" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/tx/genesis" "github.com/outofforest/quantum/tx/transfer" txtypes "github.com/outofforest/quantum/tx/types" @@ -61,27 +59,19 @@ func BenchmarkBalanceTransfer(b *testing.B) { func() { _, _ = rand.Read(accountBytes) - store, err := fileStore( + var size uint64 = 10 * 1024 * 1024 * 1024 + state, stateDeallocFunc, err := state.New( + size, true, // "./db.quantum", "./disk", ) if err != nil { panic(err) } - - var size uint64 = 10 * 1024 * 1024 * 1024 - state, stateDeallocFunc, err := alloc.NewState( - size, store.Size(), - true, - ) - if err != nil { - panic(err) - } defer stateDeallocFunc() db := quantum.New(quantum.Config{ State: state, - Store: store, }) ctx, cancel := context.WithCancel(logger.WithLogger(context.Background(), logger.New(logger.DefaultConfig))) @@ -173,12 +163,3 @@ func BenchmarkBalanceTransfer(b *testing.B) { }() } } - -func fileStore(path string) (*persistent.Store, error) { - file, err := os.OpenFile(path, os.O_RDWR|unix.O_DIRECT, 0o600) - if err != nil { - return nil, err - } - - return persistent.NewStore(file) -} diff --git a/db.go b/db.go index 894d330..3d727c2 100644 --- a/db.go +++ b/db.go @@ -11,12 +11,11 @@ import ( "github.com/outofforest/parallel" "github.com/outofforest/photon" - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/hash" "github.com/outofforest/quantum/list" - "github.com/outofforest/quantum/persistent" "github.com/outofforest/quantum/pipeline" "github.com/outofforest/quantum/space" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/tx/genesis" "github.com/outofforest/quantum/tx/transfer" txtypes "github.com/outofforest/quantum/tx/types" @@ -26,8 +25,7 @@ import ( // Config stores snapshot configuration. type Config struct { - State *alloc.State - Store *persistent.Store + State *state.State } // SpaceToCommit represents requested space which might require to be committed. @@ -117,8 +115,6 @@ func (db *DB) Close() { // Run runs db goroutines. func (db *DB) Run(ctx context.Context) error { - defer db.config.Store.Close() - return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { supervisorReader := db.queueReader prepareTxReader := pipeline.CloneReader(db.queueReader) @@ -223,8 +219,7 @@ func (db *DB) Run(ctx context.Context) error { return pointerHashReader.Run(ctx, pipe04UpdatePointerHashes(db.config.State)) }) spawn("pipe05StoreNodes", parallel.Fail, func(ctx context.Context) error { - storeWriter, err := db.config.Store.NewWriter(db.config.State.Origin(), - db.config.State.VolatileSize()) + storeWriter, err := db.config.State.NewPersistentWriter() if err != nil { return err } @@ -259,7 +254,7 @@ func pipe01PrepareTransaction(balanceSpace *space.Space[txtypes.Account, txtypes } func pipe02ExecuteTransaction( - state *alloc.State, + state *state.State, snapshotID *types.SnapshotID, snapshotInfo *types.SnapshotInfo, snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo], @@ -358,7 +353,7 @@ func pipe02ExecuteTransaction( } } -func pipe03UpdateDataHashes(state *alloc.State, modMask, mod uint64) pipeline.TxFunc { +func pipe03UpdateDataHashes(state *state.State, modMask, mod uint64) pipeline.TxFunc { var matrix [16]*byte matrixP := &matrix[0] @@ -473,7 +468,7 @@ func (s *slot) Read() error { return nil } -func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc { +func pipe04UpdatePointerHashes(state *state.State) pipeline.TxFunc { r := &reader{} var slots [16]*slot @@ -580,7 +575,7 @@ func pipe04UpdatePointerHashes(state *alloc.State) pipeline.TxFunc { } } -func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc { +func pipe05StoreNodes(storeWriter *state.Writer) pipeline.TxFunc { return func(tx *pipeline.TransactionRequest, readCount uint64) (uint64, error) { for lr := tx.ListRequest; lr != nil; lr = lr.Next { for i := range lr.ListsToStore { @@ -613,11 +608,11 @@ func pipe05StoreNodes(storeWriter *persistent.Writer) pipeline.TxFunc { func deleteSnapshot( snapshotID types.SnapshotID, deleteSnapshotID types.SnapshotID, - state *alloc.State, + state *state.State, tx *pipeline.TransactionRequest, - volatileAllocator *alloc.Allocator[types.VolatileAddress], - volatileDeallocator *alloc.Deallocator[types.VolatileAddress], - persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], + volatileDeallocator *state.Deallocator[types.VolatileAddress], + persistentDeallocator *state.Deallocator[types.PersistentAddress], snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo], deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.ListRoot], ) error { @@ -711,11 +706,11 @@ func commit( snapshotID types.SnapshotID, snapshotInfo types.SnapshotInfo, tx *pipeline.TransactionRequest, - state *alloc.State, + state *state.State, deallocationListsToCommit map[types.SnapshotID]*types.ListRoot, - volatileAllocator *alloc.Allocator[types.VolatileAddress], - persistentAllocator *alloc.Allocator[types.PersistentAddress], - persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], + persistentAllocator *state.Allocator[types.PersistentAddress], + persistentDeallocator *state.Deallocator[types.PersistentAddress], snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo], deallocationListSpace *space.Space[deallocationKey, types.ListRoot], ) error { @@ -815,14 +810,14 @@ func commit( } func deallocateNode( - state *alloc.State, + state *state.State, snapshotID types.SnapshotID, nodeSnapshotID types.SnapshotID, nodeAddress types.PersistentAddress, deallocationListsToCommit map[types.SnapshotID]*types.ListRoot, - volatileAllocator *alloc.Allocator[types.VolatileAddress], - persistentAllocator *alloc.Allocator[types.PersistentAddress], - persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], + persistentAllocator *state.Allocator[types.PersistentAddress], + persistentDeallocator *state.Deallocator[types.PersistentAddress], immediateDeallocation bool, ) (types.ListRoot, error) { // Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one. diff --git a/db_test.go b/db_test.go index 5458582..4a759e9 100644 --- a/db_test.go +++ b/db_test.go @@ -6,9 +6,9 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" "github.com/outofforest/quantum/space" + "github.com/outofforest/quantum/state" txtypes "github.com/outofforest/quantum/tx/types" "github.com/outofforest/quantum/types" ) @@ -35,7 +35,7 @@ func TestPipe01PrepareTransactionsDoesNothingIfTransactionIsNil(t *testing.T) { } func newSpace(t *testing.T) *space.Space[txtypes.Account, txtypes.Amount] { - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) dataNodeAssistant, err := space.NewDataNodeAssistant[txtypes.Account, txtypes.Amount]() require.NoError(t, err) diff --git a/hash/asm_test.go b/hash/asm_test.go index e65dfcf..0452129 100644 --- a/hash/asm_test.go +++ b/hash/asm_test.go @@ -13,18 +13,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) var ( zeroNode = func() []byte { - b, _, _ := alloc.Allocate(types.NodeLength, 64, false) + b, _, _ := state.Allocate(types.NodeLength, 64, false) return unsafe.Slice((*byte)(b), types.NodeLength) }() zn = &zeroNode[0] oneNode = func() []byte { - b, _, _ := alloc.Allocate(types.NodeLength, 64, false) + b, _, _ := state.Allocate(types.NodeLength, 64, false) bSlice := unsafe.Slice((*byte)(b), types.NodeLength) for i := range bSlice { bSlice[i] = 0xff @@ -48,7 +48,7 @@ func TestBlake3OneMessage(t *testing.T) { matrix := zeroMatrix matrix[i] = on - hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false) + hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false) require.NoError(t, err) t.Cleanup(hashesDealloc) @@ -74,7 +74,7 @@ func TestBlake3OneMessage(t *testing.T) { func TestBlake3Zeros(t *testing.T) { matrix := zeroMatrix - hashesP, hashesDealloc, err := alloc.Allocate(16*types.HashLength, 32, false) + hashesP, hashesDealloc, err := state.Allocate(16*types.HashLength, 32, false) require.NoError(t, err) t.Cleanup(hashesDealloc) @@ -97,7 +97,7 @@ func TestLastHashIsStored(t *testing.T) { matrix := zeroMatrix matrix[i] = on - hashesP, hashesDealloc, err := alloc.Allocate(2*types.HashLength, 32, false) + hashesP, hashesDealloc, err := state.Allocate(2*types.HashLength, 32, false) require.NoError(t, err) t.Cleanup(hashesDealloc) diff --git a/hash/blake3_test.go b/hash/blake3_test.go index 5d865d7..1980db7 100644 --- a/hash/blake3_test.go +++ b/hash/blake3_test.go @@ -12,13 +12,13 @@ import ( blake3zeebo "github.com/zeebo/blake3" blake3luke "lukechampine.com/blake3" - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) //nolint:unparam func randData(size uint64) []byte { - dataP, _, _ := alloc.Allocate(size, 64, false) + dataP, _, _ := state.Allocate(size, 64, false) data := unsafe.Slice((*byte)(dataP), size) if _, err := rand.Read(data); err != nil { panic(err) @@ -88,7 +88,7 @@ func BenchmarkChecksum4KAVX(b *testing.B) { var z [16]*byte for i := range z { - zP, dealloc, err := alloc.Allocate(types.HashLength, 32, false) + zP, dealloc, err := state.Allocate(types.HashLength, 32, false) require.NoError(b, err) b.Cleanup(dealloc) z[i] = (*byte)(zP) diff --git a/list/list.go b/list/list.go index 201a8c9..36a14de 100644 --- a/list/list.go +++ b/list/list.go @@ -1,7 +1,7 @@ package list import ( - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) @@ -9,9 +9,9 @@ import ( func Add( listRoot *types.ListRoot, nodeAddress types.PersistentAddress, - state *alloc.State, - volatileAllocator *alloc.Allocator[types.VolatileAddress], - persistentAllocator *alloc.Allocator[types.PersistentAddress], + state *state.State, + volatileAllocator *state.Allocator[types.VolatileAddress], + persistentAllocator *state.Allocator[types.PersistentAddress], ) (types.ListRoot, error) { if listRoot.VolatileAddress == types.FreeAddress { var err error @@ -65,9 +65,9 @@ func Add( // Deallocate deallocates nodes referenced by the list. func Deallocate( listRoot types.ListRoot, - state *alloc.State, - volatileDeallocator *alloc.Deallocator[types.VolatileAddress], - persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + state *state.State, + volatileDeallocator *state.Deallocator[types.VolatileAddress], + persistentDeallocator *state.Deallocator[types.PersistentAddress], ) error { for { // It is safe to do deallocations here because deallocated nodes are not reallocated until commit is finalized. diff --git a/list/list_test.go b/list/list_test.go index 0bdf380..ede63c9 100644 --- a/list/list_test.go +++ b/list/list_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) @@ -15,7 +15,7 @@ const stateSize = (numOfAddresses + 6) * types.NodeLength func TestList(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) volatileAllocator := state.NewVolatileAllocator() persistentAllocator := state.NewPersistentAllocator() diff --git a/space/compare/benchmark_test.go b/space/compare/benchmark_test.go index 0ef9efa..e29fddf 100644 --- a/space/compare/benchmark_test.go +++ b/space/compare/benchmark_test.go @@ -6,7 +6,7 @@ import ( "testing" "unsafe" - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" ) var ( @@ -14,7 +14,7 @@ var ( values = func() []uint64 { const numOfItems = 32 - p, _, _ := alloc.Allocate(numOfItems*8, 64, false) + p, _, _ := state.Allocate(numOfItems*8, 64, false) result := unsafe.Slice((*uint64)(p), numOfItems) copy(result, []uint64{ diff --git a/space/space.go b/space/space.go index 07d4c7d..963ea07 100644 --- a/space/space.go +++ b/space/space.go @@ -9,9 +9,9 @@ import ( "github.com/cespare/xxhash" "github.com/outofforest/photon" - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" "github.com/outofforest/quantum/space/compare" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) @@ -25,7 +25,7 @@ const ( // Config stores space configuration. type Config[K, V comparable] struct { SpaceRoot types.NodeRoot - State *alloc.State + State *state.State DataNodeAssistant *DataNodeAssistant[K, V] DeletionCounter *uint64 NoSnapshots bool @@ -147,7 +147,7 @@ func (s *Space[K, V]) DeleteKey( func (s *Space[K, V]) SetKey( v *Entry[K, V], tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + allocator *state.Allocator[types.VolatileAddress], value V, ) error { v.item.Value = value @@ -286,7 +286,7 @@ func (s *Space[K, V]) set( v *Entry[K, V], volatileAddress types.VolatileAddress, tx *pipeline.TransactionRequest, - volatileAllocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], ) error { volatileAddress = s.walkPointers(v, volatileAddress) @@ -368,7 +368,7 @@ func (s *Space[K, V]) splitToIndex(parentNodeAddress types.VolatileAddress, inde func (s *Space[K, V]) splitDataNodeWithoutConflict( tx *pipeline.TransactionRequest, - volatileAllocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], index uint64, parentNodeAddress types.VolatileAddress, level uint8, @@ -437,7 +437,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( func (s *Space[K, V]) splitDataNodeWithConflict( tx *pipeline.TransactionRequest, - volatileAllocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], index uint64, parentNodeAddress types.VolatileAddress, level uint8, @@ -511,7 +511,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict( func (s *Space[K, V]) addPointerNode( v *Entry[K, V], tx *pipeline.TransactionRequest, - volatileAllocator *alloc.Allocator[types.VolatileAddress], + volatileAllocator *state.Allocator[types.VolatileAddress], conflict bool, ) (types.VolatileAddress, error) { pointerNodeVolatileAddress, err := volatileAllocator.Allocate() @@ -678,10 +678,10 @@ type Entry[K, V comparable] struct { // IteratorAndDeallocator iterates over items and deallocates space. func IteratorAndDeallocator[K, V comparable]( spaceRoot types.Pointer, - state *alloc.State, + state *state.State, dataNodeAssistant *DataNodeAssistant[K, V], - volatileDeallocator *alloc.Deallocator[types.VolatileAddress], - persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + volatileDeallocator *state.Deallocator[types.VolatileAddress], + persistentDeallocator *state.Deallocator[types.PersistentAddress], ) func(func(item *DataItem[K, V]) bool) { return func(yield func(item *DataItem[K, V]) bool) { if isFree(spaceRoot.VolatileAddress) { diff --git a/space/space_test.go b/space/space_test.go index 9cec407..1e27646 100644 --- a/space/space_test.go +++ b/space/space_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/state" txtypes "github.com/outofforest/quantum/tx/types" "github.com/outofforest/quantum/types" ) @@ -140,7 +140,7 @@ func TestCRUDOnRootDataNode(t *testing.T) { } ) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) @@ -255,7 +255,7 @@ func TestSetConflictingHashesOnRootDataNode(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKey) @@ -382,7 +382,7 @@ func TestAddingPointerNodeWithoutConflictResolution(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -452,7 +452,7 @@ func TestAddingPointerNodeWithConflictResolution(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, hashKeyFunc) @@ -524,7 +524,7 @@ func TestAddingPointerNodeForNonConflictingDataItems(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -596,7 +596,7 @@ func TestDataNodeSplitWithoutConflictResolution(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -739,7 +739,7 @@ func TestDataNodeSplitWithConflictResolution(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) hashKeyFunc := func(key *txtypes.Account, buff []byte, level uint8) types.KeyHash { return types.KeyHash(key[0]) @@ -883,7 +883,7 @@ func TestDataNodeSplitWithConflictResolution(t *testing.T) { func TestFindingAvailableFreeSlot(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -909,7 +909,7 @@ func TestFindStages(t *testing.T) { // This key hash means that item will always go to the pointer at index 0. const keyHash = 1 << 63 - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -978,7 +978,7 @@ func TestSwitchingFromMutableToImmutablePath(t *testing.T) { // After first split this key hash stays in data node 0, but after second split it will go to the data node 16. const keyHash types.KeyHash = 16 - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1080,7 +1080,7 @@ func TestExistsReturnsFalseIfKeyHashIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1116,7 +1116,7 @@ func TestExistsReturnsFalseIfKeyIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1153,7 +1153,7 @@ func TestExistsReturnsTrueAfterReplacingItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1213,7 +1213,7 @@ func TestExistsReturnsTrueAfterMovingItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1271,7 +1271,7 @@ func TestExistsReturnsTrueAfterMovingItem(t *testing.T) { func TestExistsReturnsFalseIfSlotHasNotBeenFound(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1311,7 +1311,7 @@ func TestReadReturnsDefaultValueIfKeyHashIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1347,7 +1347,7 @@ func TestReadReturnsDefaultValueIfKeyIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1384,7 +1384,7 @@ func TestReadReturnsCorrectValueAfterReplacingItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1444,7 +1444,7 @@ func TestReadReturnsCorrectValueAfterMovingItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1502,7 +1502,7 @@ func TestReadReturnsCorrectValueAfterMovingItem(t *testing.T) { func TestReadReturnsDefaultValueIfSlotHasNotBeenFound(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1536,7 +1536,7 @@ func TestReadReturnsDefaultValueIfSlotHasNotBeenFound(t *testing.T) { func TestDeletingOnEmptySpace(t *testing.T) { requireT := require.New(t) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1559,7 +1559,7 @@ func TestDeleteDoesNothingIfKeyHashIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1606,7 +1606,7 @@ func TestDeleteDoesNothingIfKeyIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1654,7 +1654,7 @@ func TestDeleteDoesNothingIfSlotIsFree(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1702,7 +1702,7 @@ func TestDeleteOnReplacedItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1772,7 +1772,7 @@ func TestDeleteOnMovedItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1836,7 +1836,7 @@ func TestDeleteDoesNothingIfSlotHasNotBeenFound(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1885,7 +1885,7 @@ func TestSetFindsAnotherSlotIfKeyHashIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1922,7 +1922,7 @@ func TestSetFindsAnotherSlotIfKeyIsDifferent(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -1961,7 +1961,7 @@ func TestSetOnReplacedItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -2031,7 +2031,7 @@ func TestSetOnMovedItem(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -2101,7 +2101,7 @@ func TestSetTheSameSlotTwiceUsingSameEntry(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -2133,7 +2133,7 @@ func TestSetTheSameSlotTwiceUsingDifferentEntry(t *testing.T) { KeyHash: 5, } - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[txtypes.Account, txtypes.Amount](t, state, nil) @@ -2159,7 +2159,7 @@ func TestSetManyItems(t *testing.T) { const numOfItems = 1000 - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[uint64, uint64](t, state, nil) @@ -2196,7 +2196,7 @@ func TestSetManyItemsWithConflicts(t *testing.T) { numOfItems = 1000 ) - state := alloc.NewForTest(t, stateSize) + state := state.NewForTest(t, stateSize) s := NewSpaceTest[uint64, uint64](t, state, nil) diff --git a/space/test.go b/space/test.go index a4adf9d..cc4ebd5 100644 --- a/space/test.go +++ b/space/test.go @@ -4,8 +4,8 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" + "github.com/outofforest/quantum/state" "github.com/outofforest/quantum/types" ) @@ -18,7 +18,7 @@ type TestKey[K comparable] struct { // NewSpaceTest creates new wrapper for space testing. func NewSpaceTest[K, V comparable]( t require.TestingT, - state *alloc.State, + state *state.State, hashKeyFunc func(key *K, buff []byte, level uint8) types.KeyHash, ) *SpaceTest[K, V] { dataNodeAssistant, err := NewDataNodeAssistant[K, V]() @@ -53,7 +53,7 @@ func NewSpaceTest[K, V comparable]( type SpaceTest[K, V comparable] struct { s *Space[K, V] tx *pipeline.TransactionRequest - allocator *alloc.Allocator[types.VolatileAddress] + allocator *state.Allocator[types.VolatileAddress] } // NewEntry initializes new entry. diff --git a/alloc/allocator.go b/state/allocator.go similarity index 99% rename from alloc/allocator.go rename to state/allocator.go index ce216d1..81c1217 100644 --- a/alloc/allocator.go +++ b/state/allocator.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "github.com/outofforest/quantum/types" diff --git a/alloc/allocator_test.go b/state/allocator_test.go similarity index 99% rename from alloc/allocator_test.go rename to state/allocator_test.go index 035a0d5..2ee0806 100644 --- a/alloc/allocator_test.go +++ b/state/allocator_test.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "testing" diff --git a/alloc/mem.go b/state/mem.go similarity index 99% rename from alloc/mem.go rename to state/mem.go index 6d3295c..88345aa 100644 --- a/alloc/mem.go +++ b/state/mem.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "os" diff --git a/alloc/mem_test.go b/state/mem_test.go similarity index 96% rename from alloc/mem_test.go rename to state/mem_test.go index 3aca627..f070713 100644 --- a/alloc/mem_test.go +++ b/state/mem_test.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "testing" diff --git a/alloc/ring.go b/state/ring.go similarity index 98% rename from alloc/ring.go rename to state/ring.go index 3cb95f5..9b31420 100644 --- a/alloc/ring.go +++ b/state/ring.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "github.com/pkg/errors" diff --git a/alloc/ring_test.go b/state/ring_test.go similarity index 99% rename from alloc/ring_test.go rename to state/ring_test.go index 2bd7f75..e818e53 100644 --- a/alloc/ring_test.go +++ b/state/ring_test.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "testing" diff --git a/alloc/state.go b/state/state.go similarity index 71% rename from alloc/state.go rename to state/state.go index e92019e..a077ca1 100644 --- a/alloc/state.go +++ b/state/state.go @@ -1,9 +1,12 @@ -package alloc +package state import ( + "io" + "os" "unsafe" "github.com/pkg/errors" + "golang.org/x/sys/unix" "github.com/outofforest/photon" "github.com/outofforest/quantum/types" @@ -11,20 +14,31 @@ import ( const numOfPersistentSingularityNodes = 8 -// NewState creates new DB state. -func NewState( - volatileSize, persistentSize uint64, +// New creates new DB state. +func New( + volatileSize uint64, useHugePages bool, + persistentPath string, ) (*State, func(), error) { // Align allocated memory address to the node length. It might be required if using O_DIRECT option to open // files. As a side effect it is also 64-byte aligned which is required by the AVX512 instructions. - dataP, deallocateFunc, err := Allocate(volatileSize, types.NodeLength, useHugePages) + origin, deallocateFunc, err := Allocate(volatileSize, types.NodeLength, useHugePages) if err != nil { return nil, nil, errors.Wrapf(err, "memory allocation failed") } + persistentFile, err := os.OpenFile(persistentPath, os.O_RDWR|unix.O_DIRECT, 0o600) + if err != nil { + return nil, nil, errors.WithStack(err) + } + + persistentSize, err := persistentFile.Seek(0, io.SeekEnd) + if err != nil { + return nil, nil, errors.WithStack(err) + } + volatileRing, singularityVolatileNodes := newAllocationRing[types.VolatileAddress](volatileSize, 1) - persistentRing, singularityPersistentNodes := newAllocationRing[types.PersistentAddress](persistentSize, + persistentRing, singularityPersistentNodes := newAllocationRing[types.PersistentAddress](uint64(persistentSize), numOfPersistentSingularityNodes) singularityNodeRoots := make([]types.ToStore, 0, numOfPersistentSingularityNodes) @@ -39,18 +53,23 @@ func NewState( } return &State{ - singularityNodeRoots: singularityNodeRoots, - dataP: dataP, - volatileSize: volatileSize, - volatileRing: volatileRing, - persistentRing: persistentRing, - }, deallocateFunc, nil + singularityNodeRoots: singularityNodeRoots, + origin: origin, + persistentFile: int32(persistentFile.Fd()), + volatileSize: volatileSize, + volatileRing: volatileRing, + persistentRing: persistentRing, + }, func() { + _ = persistentFile.Close() + deallocateFunc() + }, nil } // State stores the DB state. type State struct { singularityNodeRoots []types.ToStore - dataP unsafe.Pointer + origin unsafe.Pointer + persistentFile int32 volatileSize uint64 volatileRing *ring[types.VolatileAddress] persistentRing *ring[types.PersistentAddress] @@ -76,24 +95,19 @@ func (s *State) NewPersistentDeallocator() *Deallocator[types.PersistentAddress] return newDeallocator(s.persistentRing) } +// NewPersistentWriter creates new persistent writer. +func (s *State) NewPersistentWriter() (*Writer, error) { + return newWriter(s.persistentFile, s.origin, s.volatileSize) +} + // SingularityNodeRoot returns node root of singularity node. func (s *State) SingularityNodeRoot(snapshotID types.SnapshotID) types.ToStore { return s.singularityNodeRoots[snapshotID%numOfPersistentSingularityNodes] } -// Origin returns the pointer to the allocated memory. -func (s *State) Origin() unsafe.Pointer { - return s.dataP -} - -// VolatileSize returns size of the volatile memory. -func (s *State) VolatileSize() uint64 { - return s.volatileSize -} - // Node returns node bytes. func (s *State) Node(nodeAddress types.VolatileAddress) unsafe.Pointer { - return unsafe.Add(s.dataP, nodeAddress*types.NodeLength) + return unsafe.Add(s.origin, nodeAddress*types.NodeLength) } // Bytes returns byte slice of a node. diff --git a/alloc/state_test.go b/state/state_test.go similarity index 87% rename from alloc/state_test.go rename to state/state_test.go index 2336cbe..8783323 100644 --- a/alloc/state_test.go +++ b/state/state_test.go @@ -1,4 +1,4 @@ -package alloc +package state import ( "testing" @@ -108,29 +108,13 @@ func TestSingularityNodes(t *testing.T) { } } -func TestOrigin(t *testing.T) { - requireT := require.New(t) - - s := NewForTest(t, stateSize) - - requireT.Equal(s.Node(0), s.Origin()) -} - -func TestVolatileSize(t *testing.T) { - requireT := require.New(t) - - s := NewForTest(t, stateSize) - - requireT.Equal(uint64(stateSize), s.VolatileSize()) -} - func TestNode(t *testing.T) { requireT := require.New(t) s := NewForTest(t, stateSize) for i := range types.VolatileAddress(stateSize / types.NodeLength) { - requireT.Equal(uintptr(s.Node(i))-uintptr(s.Origin()), uintptr(i*types.NodeLength)) + requireT.Equal(uintptr(s.Node(i))-uintptr(s.origin), uintptr(i*types.NodeLength)) } } @@ -146,7 +130,7 @@ func TestBytes(t *testing.T) { } } - data := unsafe.Slice((*byte)(s.Origin()), stateSize) + data := unsafe.Slice((*byte)(s.origin), stateSize) for i, d := range data { requireT.Equal(byte(i/types.NodeLength), d) } @@ -167,7 +151,7 @@ func TestClear(t *testing.T) { } } - data := unsafe.Slice((*byte)(s.Origin()), stateSize) + data := unsafe.Slice((*byte)(s.origin), stateSize) for i, d := range data { addr := i / types.NodeLength if addr%2 == 0 { diff --git a/state/test.go b/state/test.go new file mode 100644 index 0000000..93e7850 --- /dev/null +++ b/state/test.go @@ -0,0 +1,31 @@ +package state + +import ( + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +// NewForTest creates state for unit tests. +func NewForTest(t *testing.T, size uint64) *State { + dir := t.TempDir() + dbFile := filepath.Join(dir, "quantum.db") + f, err := os.OpenFile(dbFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + require.NoError(t, err) + defer f.Close() + + _, err = f.Seek(int64(2*size)-1, io.SeekStart) + require.NoError(t, err) + + _, err = f.Write([]byte{0x00}) + require.NoError(t, err) + + state, stateDeallocFunc, err := New(size, false, dbFile) + require.NoError(t, err) + t.Cleanup(stateDeallocFunc) + + return state +} diff --git a/persistent/file.go b/state/writer.go similarity index 82% rename from persistent/file.go rename to state/writer.go index 14c11b6..6231653 100644 --- a/persistent/file.go +++ b/state/writer.go @@ -1,8 +1,6 @@ -package persistent +package state import ( - "io" - "os" "runtime" "syscall" "time" @@ -17,56 +15,21 @@ import ( const ( // We found that when this value is too large, together with O_DIRECT option it produces random ECANCELED errors // on CQE. Our guess for now is that it might be caused by some queue overflows in the NVME device. - submitCount = 2 << 5 - mod = submitCount - 1 - ringCapacity = 10 * submitCount + submitCount = 2 << 5 + mod = submitCount - 1 + uringCapacity = 10 * submitCount ) -// NewStore creates new persistent store. -func NewStore(file *os.File) (*Store, error) { - size, err := file.Seek(0, io.SeekEnd) - if err != nil { - _ = file.Close() - return nil, errors.WithStack(err) - } - - return &Store{ - file: file, - size: uint64(size), - }, nil -} - -// Store defines persistent store. -type Store struct { - file *os.File - size uint64 -} - -// Size returns the size of the store. -func (s *Store) Size() uint64 { - return s.size -} - -// NewWriter creates new store writer. -func (s *Store) NewWriter(volatileOrigin unsafe.Pointer, volatileSize uint64) (*Writer, error) { - return newWriter(s.file, volatileOrigin, volatileSize) -} - -// Close closes the store. -func (s *Store) Close() { - _ = s.file.Close() -} - func newWriter( - file *os.File, + fileDescriptor int32, volatileOrigin unsafe.Pointer, volatileSize uint64, ) (*Writer, error) { // Required by uring.SetupSingleIssuer. runtime.LockOSThread() - ring, err := uring.New(ringCapacity, - uring.WithCQSize(ringCapacity), + ring, err := uring.New(uringCapacity, + uring.WithCQSize(uringCapacity), uring.WithFlags(uring.SetupSingleIssuer), uring.WithSQPoll(100*time.Millisecond), ) @@ -97,10 +60,10 @@ func newWriter( } return &Writer{ - fd: int32(file.Fd()), + fd: fileDescriptor, origin: volatileOrigin, ring: ring, - cqeBuff: make([]*uring.CQEvent, ringCapacity), + cqeBuff: make([]*uring.CQEvent, uringCapacity), }, nil } @@ -166,7 +129,7 @@ func (w *Writer) Write(dstAddress types.PersistentAddress, srcAddress types.Vola if _, err := w.ring.Submit(); err != nil { return errors.WithStack(err) } - if (w.numOfEvents + 1) >= ringCapacity { + if (w.numOfEvents + 1) >= uringCapacity { return w.awaitCompletionEvents(false) } } diff --git a/tx/genesis/genesis.go b/tx/genesis/genesis.go index 7f5b375..ff71561 100644 --- a/tx/genesis/genesis.go +++ b/tx/genesis/genesis.go @@ -1,9 +1,9 @@ package genesis import ( - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" "github.com/outofforest/quantum/space" + "github.com/outofforest/quantum/state" txtypes "github.com/outofforest/quantum/tx/types" "github.com/outofforest/quantum/types" ) @@ -23,7 +23,7 @@ type Tx struct { func (t *Tx) Execute( s *space.Space[txtypes.Account, txtypes.Amount], tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + allocator *state.Allocator[types.VolatileAddress], ) error { for _, a := range t.Accounts { var v space.Entry[txtypes.Account, txtypes.Amount] diff --git a/tx/transfer/transfer.go b/tx/transfer/transfer.go index 6bf51fc..635ed54 100644 --- a/tx/transfer/transfer.go +++ b/tx/transfer/transfer.go @@ -5,9 +5,9 @@ import ( "github.com/pkg/errors" - "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" "github.com/outofforest/quantum/space" + "github.com/outofforest/quantum/state" txtypes "github.com/outofforest/quantum/tx/types" "github.com/outofforest/quantum/types" ) @@ -32,7 +32,7 @@ func (t *Tx) Prepare(s *space.Space[txtypes.Account, txtypes.Amount]) { func (t *Tx) Execute( s *space.Space[txtypes.Account, txtypes.Amount], tx *pipeline.TransactionRequest, - allocator *alloc.Allocator[types.VolatileAddress], + allocator *state.Allocator[types.VolatileAddress], ) error { fromBalance := s.ReadKey(&t.from) if fromBalance < t.Amount {