diff --git a/db.go b/db.go index 985ea62..1b5d211 100644 --- a/db.go +++ b/db.go @@ -81,7 +81,6 @@ func New(config Config) (*DB, error) { }, State: config.State, DataNodeAssistant: snapshotInfoNodeAssistant, - MassEntry: mass.New[space.Entry[types.SnapshotID, types.SnapshotInfo]](1000), NoSnapshots: true, }) @@ -92,7 +91,6 @@ func New(config Config) (*DB, error) { }, State: config.State, DataNodeAssistant: snapshotToPointerNodeAssistant, - MassEntry: mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000), NoSnapshots: true, }, ) @@ -179,11 +177,14 @@ func (db *DB) Run(ctx context.Context) error { defer db.config.State.Close() return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { - prepareTxReaders := make([]*pipeline.Reader, 0, 1) + supervisorReader := db.queueReader + prepareTxReader := pipeline.CloneReader(db.queueReader) + prepareTxReaders := make([]*pipeline.Reader, 0, 3) for range cap(prepareTxReaders) { - prepareTxReaders = append(prepareTxReaders, pipeline.CloneReader(db.queueReader)) + prepareTxReaders = append(prepareTxReaders, prepareTxReader) + prepareTxReader = pipeline.NewReader(prepareTxReader) } - executeTxReader := pipeline.NewReader(prepareTxReaders...) + executeTxReader := prepareTxReader deallocateReader := pipeline.NewReader(executeTxReader) copyReaders := make([]*pipeline.Reader, 0, 4) for range cap(copyReaders) { @@ -212,7 +213,7 @@ func (db *DB) Run(ctx context.Context) error { var processedCount uint64 for { - req, err := db.queueReader.Read(ctx) + req, err := supervisorReader.Read(ctx) if err != nil { if lastSyncCh != nil { lastSyncCh <- struct{}{} @@ -226,7 +227,7 @@ func (db *DB) Run(ctx context.Context) error { if req != nil { processedCount++ - db.queueReader.Acknowledge(processedCount, req) + supervisorReader.Acknowledge(processedCount, req) if req.Type == pipeline.Sync { lastSyncCh = req.SyncCh @@ -244,7 +245,7 @@ func (db *DB) Run(ctx context.Context) error { }) for i, reader := range prepareTxReaders { spawn(fmt.Sprintf("prepareTx-%02d", i), parallel.Fail, func(ctx context.Context) error { - return db.prepareTransactions(ctx, reader, uint64(len(prepareTxReaders)), uint64(i)) + return db.prepareTransactions(ctx, reader) }) } spawn("executeTx", parallel.Fail, func(ctx context.Context) error { @@ -290,15 +291,15 @@ func (db *DB) deleteSnapshot( walRecorder *wal.Recorder, allocator *alloc.Allocator, deallocator *alloc.Deallocator, - massSnapshotToPointerEntry *mass.Mass[space.Entry[types.SnapshotID, types.Pointer]], massStoreRequest *mass.Mass[pipeline.StoreRequest], snapshotHashBuff []byte, snapshotHashMatches []uint64, deallocationHashBuff []byte, deallocationHashMatches []uint64, ) error { - snapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator, snapshotID, snapshotHashBuff, - snapshotHashMatches) + var snapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] + err := db.snapshots.Find(&snapshotInfoValue, snapshotID, tx, walRecorder, allocator, snapshotID, space.StageData, + snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -327,8 +328,9 @@ func (db *DB) deleteSnapshot( //nolint:nestif if snapshotInfo.NextSnapshotID < db.singularityNode.LastSnapshotID { - nextSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator, - snapshotInfo.NextSnapshotID, snapshotHashBuff, snapshotHashMatches) + var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] + err := db.snapshots.Find(&nextSnapshotInfoValue, snapshotID, tx, walRecorder, allocator, + snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -356,7 +358,6 @@ func (db *DB) deleteSnapshot( SpaceRoot: nextDeallocationListRoot, State: db.config.State, DataNodeAssistant: db.snapshotToPointerNodeAssistant, - MassEntry: massSnapshotToPointerEntry, }, ) } else { @@ -375,7 +376,6 @@ func (db *DB) deleteSnapshot( SpaceRoot: deallocationListsRoot, State: db.config.State, DataNodeAssistant: db.snapshotToPointerNodeAssistant, - MassEntry: massSnapshotToPointerEntry, }, ) @@ -391,8 +391,9 @@ func (db *DB) deleteSnapshot( continue } - deallocationListValue, err := deallocationLists.Find(snapshotID, tx, walRecorder, allocator, - nextDeallocSnapshot.Key, deallocationHashBuff, deallocationHashMatches) + var deallocationListValue space.Entry[types.SnapshotID, types.Pointer] + err := deallocationLists.Find(&deallocationListValue, snapshotID, tx, walRecorder, allocator, + nextDeallocSnapshot.Key, space.StageData, deallocationHashBuff, deallocationHashMatches) if err != nil { return err } @@ -459,8 +460,9 @@ func (db *DB) deleteSnapshot( nextSnapshotInfo.PreviousSnapshotID = snapshotInfo.PreviousSnapshotID if snapshotInfo.NextSnapshotID < db.singularityNode.LastSnapshotID { - nextSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator, - snapshotInfo.NextSnapshotID, snapshotHashBuff, snapshotHashMatches) + var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] + err := db.snapshots.Find(&nextSnapshotInfoValue, snapshotID, tx, walRecorder, allocator, + snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -479,8 +481,9 @@ func (db *DB) deleteSnapshot( //nolint:nestif if snapshotInfo.PreviousSnapshotID > 0 { - previousSnapshotInfoValue, err := db.snapshots.Find(snapshotID, tx, walRecorder, allocator, - snapshotInfo.PreviousSnapshotID, snapshotHashBuff, snapshotHashMatches) + var previousSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] + err := db.snapshots.Find(&previousSnapshotInfoValue, snapshotID, tx, walRecorder, allocator, + snapshotInfo.PreviousSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -538,8 +541,9 @@ func (db *DB) commit( var sr *pipeline.StoreRequest for _, snapshotID := range lists { l := db.deallocationListsToCommit[snapshotID] - deallocationListValue, err := db.deallocationLists.Find(commitSnapshotID, tx, walRecorder, allocator, - snapshotID, deallocationHashBuff, deallocationHashMatches) + var deallocationListValue space.Entry[types.SnapshotID, types.Pointer] + err := db.deallocationLists.Find(&deallocationListValue, commitSnapshotID, tx, walRecorder, allocator, + snapshotID, space.StageData, deallocationHashBuff, deallocationHashMatches) if err != nil { return err } @@ -599,8 +603,9 @@ func (db *DB) commit( clear(db.deallocationListsToCommit) } - nextSnapshotInfoValue, err := db.snapshots.Find(commitSnapshotID, tx, walRecorder, allocator, - db.singularityNode.LastSnapshotID, snapshotHashBuff, snapshotHashMatches) + var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] + err := db.snapshots.Find(&nextSnapshotInfoValue, commitSnapshotID, tx, walRecorder, allocator, + db.singularityNode.LastSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if err != nil { return err } @@ -628,8 +633,6 @@ func (db *DB) commit( func (db *DB) prepareTransactions( ctx context.Context, pipeReader *pipeline.Reader, - divider uint64, - mod uint64, ) error { s, err := GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db) if err != nil { @@ -648,7 +651,7 @@ func (db *DB) prepareTransactions( return err } - if processedCount%divider == mod && req.Transaction != nil { + if req.Transaction != nil { if transferTx, ok := req.Transaction.(*transfer.Tx); ok { if err := transferTx.Prepare(s, db.singularityNode.LastSnapshotID, req, walRecorder, allocator, hashBuff, hashMatches); err != nil { @@ -677,7 +680,6 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read deallocationHashBuff := db.deallocationLists.NewHashBuff() deallocationHashMatches := db.deallocationLists.NewHashMatches() - massSnapshotToPointerEntry := mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000) massStoreRequest := mass.New[pipeline.StoreRequest](1000) walRecorder := wal.NewRecorder(db.config.State, allocator) @@ -707,8 +709,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read walRecorder.Commit(req) case *deleteSnapshotTx: if err := db.deleteSnapshot(tx.SnapshotID, req, walRecorder, allocator, deallocator, - massSnapshotToPointerEntry, massStoreRequest, snapshotHashBuff, snapshotHashMatches, - deallocationHashBuff, deallocationHashMatches); err != nil { + massStoreRequest, snapshotHashBuff, snapshotHashMatches, deallocationHashBuff, + deallocationHashMatches); err != nil { return err } case *genesis.Tx: @@ -1214,7 +1216,6 @@ func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V }, State: db.config.State, DataNodeAssistant: dataNodeAssistant, - MassEntry: mass.New[space.Entry[K, V]](1000), }), nil } diff --git a/space/space.go b/space/space.go index 5e0cb6d..193f704 100644 --- a/space/space.go +++ b/space/space.go @@ -9,7 +9,6 @@ import ( "github.com/cespare/xxhash" "github.com/samber/lo" - "github.com/outofforest/mass" "github.com/outofforest/photon" "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/pipeline" @@ -18,6 +17,13 @@ import ( "github.com/outofforest/quantum/wal" ) +// Stage constants. +const ( + StagePointer0 uint8 = iota + StagePointer1 + StageData +) + var ( stateFreePtr = lo.ToPtr(types.StateFree) stateDataPtr = lo.ToPtr(types.StateData) @@ -30,7 +36,6 @@ type Config[K, V comparable] struct { SpaceRoot types.NodeRoot State *alloc.State DataNodeAssistant *DataNodeAssistant[K, V] - MassEntry *mass.Mass[Entry[K, V]] NoSnapshots bool } @@ -81,43 +86,44 @@ func (s *Space[K, V]) NewHashMatches() []uint64 { // Find locates key in the space. func (s *Space[K, V]) Find( + v *Entry[K, V], snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, key K, + stage uint8, hashBuff []byte, hashMatches []uint64, -) (*Entry[K, V], error) { - v := s.config.MassEntry.New() - initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize) - copy(initBytes, s.defaultInit) - v.keyHash = hashKey(&key, nil, 0) - v.item.Key = key - v.dataItemIndex = dataItemIndex(v.keyHash, s.numOfDataItems) - - if v.storeRequest.Store[0].Pointer.State != types.StateFree && - v.storeRequest.Store[0].Pointer.SnapshotID != snapshotID { - persistentAddress, err := allocator.Allocate() - if err != nil { - return nil, err - } - - if err := wal.Deallocate(walRecorder, tx, v.storeRequest.Store[0].Pointer, persistentAddress, - s.config.NoSnapshots); err != nil { - return nil, err - } +) error { + //nolint:nestif + if v.space == nil { + initBytes := unsafe.Slice((*byte)(unsafe.Pointer(v)), s.initSize) + copy(initBytes, s.defaultInit) + v.keyHash = hashKey(&key, nil, 0) + v.item.Key = key + v.dataItemIndex = dataItemIndex(v.keyHash, s.numOfDataItems) + v.stage = stage + + if s.config.SpaceRoot.Pointer.State != types.StateFree && + s.config.SpaceRoot.Pointer.SnapshotID != snapshotID { + persistentAddress, err := allocator.Allocate() + if err != nil { + return err + } - // This is not stored in WAL because space roots are stored separately on commit. - v.storeRequest.Store[0].Pointer.SnapshotID = snapshotID - v.storeRequest.Store[0].Pointer.PersistentAddress = persistentAddress - } + if err := wal.Deallocate(walRecorder, tx, s.config.SpaceRoot.Pointer, persistentAddress, + s.config.NoSnapshots); err != nil { + return err + } - if err := s.find(snapshotID, tx, walRecorder, allocator, v, hashBuff, hashMatches); err != nil { - return nil, err + // This is not stored in WAL because space roots are stored separately on commit. + s.config.SpaceRoot.Pointer.SnapshotID = snapshotID + s.config.SpaceRoot.Pointer.PersistentAddress = persistentAddress + } } - return v, nil + return s.find(snapshotID, tx, walRecorder, allocator, v, hashBuff, hashMatches) } // Query queries the key. @@ -405,6 +411,16 @@ func (s *Space[K, V]) find( return err } + switch { + case v.stage == StageData: + case v.stage == StagePointer0: + v.stage = StagePointer1 + return nil + case v.stage == StagePointer1: + v.stage = StageData + return nil + } + if v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.State != types.StateData { v.keyHashP = nil v.itemP = nil @@ -873,6 +889,10 @@ func (s *Space[K, V]) walkPointers( } else { v.nextDataNodeState = &pointerNode.Pointers[nextIndex].State } + + if v.stage == StagePointer0 && v.level == 3 { + return nil + } } return nil @@ -927,6 +947,7 @@ type Entry[K, V comparable] struct { parentIndex uint64 dataItemIndex uint64 exists bool + stage uint8 level uint8 } diff --git a/tx/genesis/genesis.go b/tx/genesis/genesis.go index 4fba388..cbbfd46 100644 --- a/tx/genesis/genesis.go +++ b/tx/genesis/genesis.go @@ -22,7 +22,7 @@ type Tx struct { // Execute executes transaction. func (t *Tx) Execute( - space *space.Space[txtypes.Account, txtypes.Amount], + s *space.Space[txtypes.Account, txtypes.Amount], snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, @@ -31,8 +31,9 @@ func (t *Tx) Execute( hashMatches []uint64, ) error { for _, a := range t.Accounts { - v, err := space.Find(snapshotID, tx, walRecorder, allocator, a.Account, hashBuff, hashMatches) - if err != nil { + var v space.Entry[txtypes.Account, txtypes.Amount] + if err := s.Find(&v, snapshotID, tx, walRecorder, allocator, a.Account, space.StageData, hashBuff, + hashMatches); err != nil { return err } diff --git a/tx/transfer/transfer.go b/tx/transfer/transfer.go index d45b2b6..ac9be9b 100644 --- a/tx/transfer/transfer.go +++ b/tx/transfer/transfer.go @@ -19,27 +19,25 @@ type Tx struct { To txtypes.Account Amount txtypes.Amount - from *space.Entry[txtypes.Account, txtypes.Amount] - to *space.Entry[txtypes.Account, txtypes.Amount] + from space.Entry[txtypes.Account, txtypes.Amount] + to space.Entry[txtypes.Account, txtypes.Amount] } // Prepare prepares transaction for execution. func (t *Tx) Prepare( - space *space.Space[txtypes.Account, txtypes.Amount], + s *space.Space[txtypes.Account, txtypes.Amount], snapshotID types.SnapshotID, tx *pipeline.TransactionRequest, walRecorder *wal.Recorder, allocator *alloc.Allocator, hashBuff []byte, hashMatches []uint64, ) error { - var err error - t.from, err = space.Find(snapshotID, tx, walRecorder, allocator, t.From, hashBuff, hashMatches) - if err != nil { + if err := s.Find(&t.from, snapshotID, tx, walRecorder, allocator, t.From, space.StagePointer0, hashBuff, + hashMatches); err != nil { return err } - t.to, err = space.Find(snapshotID, tx, walRecorder, allocator, t.To, hashBuff, hashMatches) - return err + return s.Find(&t.to, snapshotID, tx, walRecorder, allocator, t.To, space.StagePointer0, hashBuff, hashMatches) } // Execute executes transaction.