diff --git a/alloc/allocator.go b/alloc/allocator.go index d127edc..7faf61a 100644 --- a/alloc/allocator.go +++ b/alloc/allocator.go @@ -98,10 +98,6 @@ type Deallocator[A Address] struct { // Deallocate deallocates single node. func (d *Deallocator[A]) Deallocate(nodeAddress A) { - if nodeAddress == 0 { - return - } - d.release = append(d.release, nodeAddress) if len(d.release) == cap(d.release) { d.sinkCh <- d.release diff --git a/alloc/state.go b/alloc/state.go index ab1e0b2..bb2b269 100644 --- a/alloc/state.go +++ b/alloc/state.go @@ -29,11 +29,9 @@ func NewState( persistentAllocationCh, singularityPersistentAddress := NewAllocationCh[types.PersistentAddress]( persistentSize, nodesPerGroup, false) - singularityNode := (*types.SingularityNode)(unsafe.Add(dataP, types.NodeLength*singularityVolatileAddress)) return &State{ nodesPerGroup: nodesPerGroup, singularityNodeRoot: types.NodeRoot{ - Hash: &singularityNode.Hash, Pointer: &types.Pointer{ Revision: 1, VolatileAddress: singularityVolatileAddress, diff --git a/benchmark_test.go b/benchmark_test.go index 981502c..f111350 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -66,7 +66,7 @@ func BenchmarkBalanceTransfer(b *testing.B) { panic(err) } - var size uint64 = 20 * 1024 * 1024 * 1024 + var size uint64 = 5 * 1024 * 1024 * 1024 state, stateDeallocFunc, err := alloc.NewState( size, store.Size(), 100, @@ -77,13 +77,10 @@ func BenchmarkBalanceTransfer(b *testing.B) { } defer stateDeallocFunc() - db, err := quantum.New(quantum.Config{ + db := quantum.New(quantum.Config{ State: state, Store: store, }) - if err != nil { - panic(err) - } ctx, cancel := context.WithCancel(logger.WithLogger(context.Background(), logger.New(logger.DefaultConfig))) b.Cleanup(cancel) diff --git a/db.go b/db.go index a7d1caf..7352b8c 100644 --- a/db.go +++ b/db.go @@ -47,73 +47,29 @@ type deallocationKey struct { } // New creates new database. -func New(config Config) (*DB, error) { - snapshotInfoNodeAssistant, err := space.NewDataNodeAssistant[types.SnapshotID, types.SnapshotInfo]() - if err != nil { - return nil, err - } - - deallocationNodeAssistant, err := space.NewDataNodeAssistant[deallocationKey, types.PersistentAddress]() - if err != nil { - return nil, err - } - +func New(config Config) *DB { queue, queueReader := pipeline.New() db := &DB{ - config: config, - txRequestFactory: pipeline.NewTransactionRequestFactory(), - singularityNode: photon.FromPointer[types.SingularityNode](config.State.Node(0)), - snapshotInfoNodeAssistant: snapshotInfoNodeAssistant, - deallocationNodeAssistant: deallocationNodeAssistant, - deallocationListsToCommit: map[types.SnapshotID]*list.Pointer{}, - queue: queue, - queueReader: queueReader, - } - - // Logical nodes might be deallocated immediately. - db.snapshots = space.New[types.SnapshotID, types.SnapshotInfo](space.Config[types.SnapshotID, types.SnapshotInfo]{ - SpaceRoot: types.NodeRoot{ - Pointer: &db.singularityNode.SnapshotRoot, - }, - State: config.State, - DataNodeAssistant: snapshotInfoNodeAssistant, - DeletionCounter: lo.ToPtr[uint64](0), - NoSnapshots: true, - }) - - db.deallocationLists = space.New[deallocationKey, types.PersistentAddress]( - space.Config[deallocationKey, types.PersistentAddress]{ - SpaceRoot: types.NodeRoot{ - Pointer: &db.snapshotInfo.DeallocationRoot, - }, - State: config.State, - DataNodeAssistant: deallocationNodeAssistant, - DeletionCounter: lo.ToPtr[uint64](0), - NoSnapshots: true, - }, - ) - - if err := db.prepareNextSnapshot(); err != nil { - return nil, err + config: config, + txRequestFactory: pipeline.NewTransactionRequestFactory(), + singularityNode: photon.FromPointer[types.SingularityNode](config.State.Node(0)), + queue: queue, + queueReader: queueReader, } - return db, nil + + db.prepareNextSnapshot() + return db } // DB represents the database. type DB struct { - config Config - txRequestFactory *pipeline.TransactionRequestFactory - singularityNode *types.SingularityNode - snapshotInfo types.SnapshotInfo - snapshots *space.Space[types.SnapshotID, types.SnapshotInfo] - deallocationLists *space.Space[deallocationKey, types.PersistentAddress] - - snapshotInfoNodeAssistant *space.DataNodeAssistant[types.SnapshotID, types.SnapshotInfo] - deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.PersistentAddress] + config Config + txRequestFactory *pipeline.TransactionRequestFactory + singularityNode *types.SingularityNode + snapshotInfo types.SnapshotInfo - spaceDeletionCounters [types.NumOfSpaces]uint64 - deallocationListsToCommit map[types.SnapshotID]*list.Pointer + spaceDeletionCounters [types.NumOfSpaces]uint64 queueReader *pipeline.Reader queue *pipeline.Pipeline @@ -148,7 +104,9 @@ func (db *DB) Commit() error { } db.config.State.Commit() - return db.prepareNextSnapshot() + db.prepareNextSnapshot() + + return nil } // Close tells that there will be no more operations done. @@ -175,8 +133,7 @@ func (db *DB) Run(ctx context.Context) error { prepareTxReader = pipeline.NewReader(prepareTxReader) } executeTxReader := prepareTxReader - deallocateReader := pipeline.NewReader(executeTxReader) - prevHashReader := deallocateReader + prevHashReader := executeTxReader dataHashReaders := make([]*pipeline.Reader, 0, 2) for range cap(dataHashReaders) { nextReader := pipeline.NewReader(prevHashReader) @@ -189,8 +146,7 @@ func (db *DB) Run(ctx context.Context) error { pointerHashReaders = append(pointerHashReaders, nextReader) prevHashReader = nextReader } - skipNodesAndCommitReader := pipeline.NewReader(prevHashReader) - storeNodesReader := pipeline.NewReader(skipNodesAndCommitReader) + storeNodesReader := pipeline.NewReader(prevHashReader) spawn("supervisor", parallel.Exit, func(ctx context.Context) error { var lastCommitCh chan<- error @@ -225,9 +181,6 @@ func (db *DB) Run(ctx context.Context) error { spawn("executeTx", parallel.Fail, func(ctx context.Context) error { return db.executeTransactions(ctx, executeTxReader) }) - spawn("deallocate", parallel.Fail, func(ctx context.Context) error { - return db.processDeallocations(ctx, deallocateReader) - }) for i, reader := range dataHashReaders { spawn(fmt.Sprintf("datahash-%02d", i), parallel.Fail, func(ctx context.Context) error { return db.updateDataHashes(ctx, reader, uint64(i)) @@ -238,9 +191,6 @@ func (db *DB) Run(ctx context.Context) error { return db.updatePointerHashes(ctx, reader, uint64(i)) }) } - spawn("skipNodesAndCommit", parallel.Fail, func(ctx context.Context) error { - return db.skipNodesAndCommit(ctx, skipNodesAndCommitReader) - }) spawn("storeNodes", parallel.Fail, func(ctx context.Context) error { return db.storeNodes(ctx, storeNodesReader) }) @@ -258,19 +208,21 @@ func (db *DB) deleteSnapshot( tx *pipeline.TransactionRequest, volatileAllocator *alloc.Allocator[types.VolatileAddress], persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo], snapshotHashBuff []byte, snapshotHashMatches []uint64, + deallocationNodeAssistant *space.DataNodeAssistant[deallocationKey, types.PersistentAddress], deallocationHashBuff []byte, deallocationHashMatches []uint64, storeReader *persistent.Reader, nodeBuff1, nodeBuff2 unsafe.Pointer, ) error { - if snapshotID == db.snapshotInfo.PreviousSnapshotID { - return errors.New("deleting latest snapshot is forbidden") + if snapshotID == db.singularityNode.LastSnapshotID-1 { + return errors.New("deleting latest persistent snapshot is forbidden") } var snapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] - db.snapshots.Find(&snapshotInfoValue, snapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) + snapshotSpace.Find(&snapshotInfoValue, snapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if exists := snapshotInfoValue.Exists(snapshotHashBuff, snapshotHashMatches); !exists { return errors.Errorf("snapshot %d to delete does not exist", snapshotID) @@ -280,7 +232,7 @@ func (db *DB) deleteSnapshot( snapshotInfoValue.Delete(tx, snapshotHashBuff, snapshotHashMatches) var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] - db.snapshots.Find(&nextSnapshotInfoValue, snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, + snapshotSpace.Find(&nextSnapshotInfoValue, snapshotInfo.NextSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if exists := nextSnapshotInfoValue.Exists(snapshotHashBuff, snapshotHashMatches); !exists { @@ -291,12 +243,17 @@ func (db *DB) deleteSnapshot( deallocationListsRoot := types.NodeRoot{ Pointer: &snapshotInfo.DeallocationRoot, } + + // FIXME (wojciech): What??? Volatile nodes of that space have been deallocated long time ago, so this code + // is a complete disaster!!! It succeeded only because volatile address pool was so huge that the nodes haven't + // been reallocated before deleting the snapshot. deallocationLists := space.New[deallocationKey, types.PersistentAddress]( space.Config[deallocationKey, types.PersistentAddress]{ SpaceRoot: deallocationListsRoot, State: db.config.State, - DataNodeAssistant: db.deallocationNodeAssistant, + DataNodeAssistant: deallocationNodeAssistant, DeletionCounter: lo.ToPtr[uint64](0), + NoSnapshots: true, }, ) @@ -306,7 +263,7 @@ func (db *DB) deleteSnapshot( for nextDeallocSnapshot := range space.PersistentIteratorAndDeallocator( nextSnapshotInfo.DeallocationRoot, storeReader, - db.deallocationNodeAssistant, + deallocationNodeAssistant, persistentDeallocator, nodeBuff1, &err, @@ -353,7 +310,7 @@ func (db *DB) deleteSnapshot( if snapshotInfo.PreviousSnapshotID > 0 { var previousSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] - db.snapshots.Find(&previousSnapshotInfoValue, snapshotInfo.PreviousSnapshotID, space.StageData, + snapshotSpace.Find(&previousSnapshotInfoValue, snapshotInfo.PreviousSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if exists := previousSnapshotInfoValue.Exists(snapshotHashBuff, snapshotHashMatches); !exists { @@ -379,28 +336,45 @@ func (db *DB) deleteSnapshot( func (db *DB) commit( tx *pipeline.TransactionRequest, + deallocationListsToCommit map[types.SnapshotID]*list.Pointer, volatileAllocator *alloc.Allocator[types.VolatileAddress], volatileDeallocator *alloc.Deallocator[types.VolatileAddress], persistentAllocator *alloc.Allocator[types.PersistentAddress], + persistentDeallocator *alloc.Deallocator[types.PersistentAddress], + snapshotSpace *space.Space[types.SnapshotID, types.SnapshotInfo], snapshotHashBuff []byte, snapshotHashMatches []uint64, + deallocationListSpace *space.Space[deallocationKey, types.PersistentAddress], deallocationHashBuff []byte, deallocationHashMatches []uint64, ) error { - lastSr := tx.LastStoreRequest + // Deallocations done in this function are done after standard deallocation are processed by the transaction + // execution goroutine. Deallocations done here don't go to deallocation lists but are deallocated immediately. + + db.snapshotInfo.PreviousSnapshotID = db.singularityNode.LastSnapshotID - 1 + db.snapshotInfo.NextSnapshotID = db.singularityNode.LastSnapshotID + 1 + db.snapshotInfo.DeallocationRoot = types.Pointer{} + //nolint:nestif - if len(db.deallocationListsToCommit) > 0 { - lists := make([]types.SnapshotID, 0, len(db.deallocationListsToCommit)) - for snapshotID := range db.deallocationListsToCommit { + if len(deallocationListsToCommit) > 0 { + lastSr := tx.LastStoreRequest + + lists := make([]types.SnapshotID, 0, len(deallocationListsToCommit)) + for snapshotID := range deallocationListsToCommit { lists = append(lists, snapshotID) } sort.Slice(lists, func(i, j int) bool { return lists[i] < lists[j] }) var lr *pipeline.ListRequest for _, snapshotID := range lists { - listRoot := db.deallocationListsToCommit[snapshotID] + listRoot := deallocationListsToCommit[snapshotID] + + // It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later. + // It's because deallocated nodes are never reallocated until current commit is finalized. + volatileDeallocator.Deallocate(listRoot.VolatileAddress) + var deallocationListValue space.Entry[deallocationKey, types.PersistentAddress] - db.deallocationLists.Find(&deallocationListValue, deallocationKey{ + deallocationListSpace.Find(&deallocationListValue, deallocationKey{ ListSnapshotID: db.singularityNode.LastSnapshotID, SnapshotID: snapshotID, }, space.StageData, deallocationHashBuff, @@ -429,9 +403,11 @@ func (db *DB) commit( tx.AddListRequest(lr) } + clear(deallocationListsToCommit) + for sr := *lastSr; sr != nil; sr = sr.Next { for i := range sr.PointersToStore { - if sr.Store[i].Pointer.SnapshotID != db.singularityNode.LastSnapshotID { + if sr.Store[i].Pointer.SnapshotID == 0 { persistentAddr, err := persistentAllocator.Allocate() if err != nil { return err @@ -444,15 +420,15 @@ func (db *DB) commit( } } - lastSr = tx.LastStoreRequest - // It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later. // It's because deallocated nodes are never reallocated until current commit is finalized. space.DeallocateVolatile(db.snapshotInfo.DeallocationRoot.VolatileAddress, volatileDeallocator, db.config.State) } + lastSr := tx.LastStoreRequest + var nextSnapshotInfoValue space.Entry[types.SnapshotID, types.SnapshotInfo] - db.snapshots.Find(&nextSnapshotInfoValue, db.singularityNode.LastSnapshotID, space.StageData, snapshotHashBuff, + snapshotSpace.Find(&nextSnapshotInfoValue, db.singularityNode.LastSnapshotID, space.StageData, snapshotHashBuff, snapshotHashMatches) if err := nextSnapshotInfoValue.Set( tx, @@ -467,6 +443,9 @@ func (db *DB) commit( for sr := *lastSr; sr != nil; sr = sr.Next { for i := range sr.PointersToStore { if sr.Store[i].Pointer.SnapshotID != db.singularityNode.LastSnapshotID { + if sr.Store[i].Pointer.PersistentAddress != 0 { + persistentDeallocator.Deallocate(sr.Store[i].Pointer.PersistentAddress) + } persistentAddr, err := persistentAllocator.Allocate() if err != nil { return err @@ -479,10 +458,12 @@ func (db *DB) commit( } } - sr := &pipeline.StoreRequest{} + sr := &pipeline.StoreRequest{ + PointersToStore: 1, + NoSnapshots: true, + } sr.Store[0] = db.config.State.SingularityNodeRoot() sr.Store[0].Pointer.Revision = uintptr(unsafe.Pointer(sr)) - sr.PointersToStore = 1 tx.AddStoreRequest(sr) return nil @@ -524,6 +505,8 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read defer storeReader.Close() volatileAllocator := db.config.State.NewVolatileAllocator() + volatileDeallocator := db.config.State.NewVolatileDeallocator() + persistentAllocator := db.config.State.NewPersistentAllocator() persistentDeallocator := db.config.State.NewPersistentDeallocator() // We use allocator to allocate this region to get aligned address effortlessly. @@ -538,6 +521,40 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read nodeBuff1 := db.config.State.Node(nodeBuff1Address) nodeBuff2 := db.config.State.Node(nodeBuff2Address) + snapshotInfoNodeAssistant, err := space.NewDataNodeAssistant[types.SnapshotID, types.SnapshotInfo]() + if err != nil { + return err + } + + snapshotSpace := space.New[types.SnapshotID, types.SnapshotInfo](space.Config[types.SnapshotID, types.SnapshotInfo]{ + SpaceRoot: types.NodeRoot{ + Pointer: &db.singularityNode.SnapshotRoot, + }, + State: db.config.State, + DataNodeAssistant: snapshotInfoNodeAssistant, + DeletionCounter: lo.ToPtr[uint64](0), + NoSnapshots: true, + }) + + deallocationListsToCommit := map[types.SnapshotID]*list.Pointer{} + + deallocationNodeAssistant, err := space.NewDataNodeAssistant[deallocationKey, types.PersistentAddress]() + if err != nil { + return err + } + + deallocationListSpace := space.New[deallocationKey, types.PersistentAddress]( + space.Config[deallocationKey, types.PersistentAddress]{ + SpaceRoot: types.NodeRoot{ + Pointer: &db.snapshotInfo.DeallocationRoot, + }, + State: db.config.State, + DataNodeAssistant: deallocationNodeAssistant, + DeletionCounter: lo.ToPtr[uint64](0), + NoSnapshots: true, + }, + ) + s, err := GetSpace[txtypes.Account, txtypes.Amount](spaces.Balances, db) if err != nil { return err @@ -545,10 +562,10 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read hashBuff := s.NewHashBuff() hashMatches := s.NewHashMatches() - snapshotHashBuff := db.snapshots.NewHashBuff() - snapshotHashMatches := db.snapshots.NewHashMatches() - deallocationHashBuff := db.deallocationLists.NewHashBuff() - deallocationHashMatches := db.deallocationLists.NewHashMatches() + snapshotHashBuff := snapshotSpace.NewHashBuff() + snapshotHashMatches := snapshotSpace.NewHashMatches() + deallocationHashBuff := deallocationListSpace.NewHashBuff() + deallocationHashMatches := deallocationListSpace.NewHashMatches() for processedCount := uint64(0); ; processedCount++ { req, err := pipeReader.Read(ctx) @@ -564,8 +581,9 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read } case *deleteSnapshotTx: if err := db.deleteSnapshot(tx.SnapshotID, req, volatileAllocator, - persistentDeallocator, snapshotHashBuff, snapshotHashMatches, deallocationHashBuff, - deallocationHashMatches, storeReader, nodeBuff1, nodeBuff2); err != nil { + persistentDeallocator, snapshotSpace, snapshotHashBuff, snapshotHashMatches, + deallocationNodeAssistant, deallocationHashBuff, deallocationHashMatches, storeReader, nodeBuff1, + nodeBuff2); err != nil { return err } case *genesis.Tx: @@ -577,21 +595,6 @@ func (db *DB) executeTransactions(ctx context.Context, pipeReader *pipeline.Read } } - pipeReader.Acknowledge(processedCount+1, req) - } -} - -func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Reader) error { - volatileAllocator := db.config.State.NewVolatileAllocator() - persistentAllocator := db.config.State.NewPersistentAllocator() - persistentDeallocator := db.config.State.NewPersistentDeallocator() - - for processedCount := uint64(0); ; processedCount++ { - req, err := pipeReader.Read(ctx) - if err != nil { - return err - } - var lr *pipeline.ListRequest for sr := req.StoreRequest; sr != nil; sr = sr.Next { if sr.PointersToStore == 0 { @@ -606,12 +609,16 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea if root.Pointer.SnapshotID != db.singularityNode.LastSnapshotID { if root.Pointer.PersistentAddress != 0 { listNodePointer, err := db.deallocateNode(root.Pointer.SnapshotID, root.Pointer.PersistentAddress, - volatileAllocator, persistentAllocator, persistentDeallocator, sr.NoSnapshots) + deallocationListsToCommit, volatileAllocator, persistentAllocator, persistentDeallocator) if err != nil { return err } if listNodePointer.VolatileAddress != types.FreeAddress { + // It is safe to deallocate volatile nodes here despite they are used to write persistent nodes later. + // It's because deallocated nodes are never reallocated until current commit is finalized. + volatileDeallocator.Deallocate(listNodePointer.VolatileAddress) + if lr == nil { lr = &pipeline.ListRequest{} } @@ -643,6 +650,14 @@ func (db *DB) processDeallocations(ctx context.Context, pipeReader *pipeline.Rea req.AddListRequest(lr) } + if req.Type == pipeline.Commit { + if err := db.commit(req, deallocationListsToCommit, volatileAllocator, volatileDeallocator, + persistentAllocator, persistentDeallocator, snapshotSpace, snapshotHashBuff, snapshotHashMatches, + deallocationListSpace, deallocationHashBuff, deallocationHashMatches); err != nil { + return err + } + } + pipeReader.Acknowledge(processedCount+1, req) } } @@ -863,47 +878,6 @@ func (db *DB) updatePointerHashes( } } -func (db *DB) skipNodesAndCommit(ctx context.Context, pipeReader *pipeline.Reader) error { - volatileAllocator := db.config.State.NewVolatileAllocator() - volatileDeallocator := db.config.State.NewVolatileDeallocator() - persistentAllocator := db.config.State.NewPersistentAllocator() - snapshotHashBuff := db.snapshots.NewHashBuff() - snapshotHashMatches := db.snapshots.NewHashMatches() - deallocationHashBuff := db.deallocationLists.NewHashBuff() - deallocationHashMatches := db.deallocationLists.NewHashMatches() - - for processedCount := uint64(0); ; processedCount++ { - req, err := pipeReader.Read(ctx) - if err != nil { - return err - } - - prevPointer := &req.StoreRequest - for sr := req.StoreRequest; sr != nil; sr = sr.Next { - for i := sr.PointersToStore - 1; i >= 0; i-- { - if sr.Store[i].Pointer.Revision != uintptr(unsafe.Pointer(sr)) { - sr.PointersLast = i + 1 - break - } - } - if sr.PointersLast == sr.PointersToStore { - *prevPointer = sr.Next - } else { - prevPointer = &sr.Next - } - } - - if req.Type == pipeline.Commit { - if err := db.commit(req, volatileAllocator, volatileDeallocator, persistentAllocator, snapshotHashBuff, - snapshotHashMatches, deallocationHashBuff, deallocationHashMatches); err != nil { - return err - } - } - - pipeReader.Acknowledge(processedCount+1, req) - } -} - func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error { storeWriter, err := db.config.Store.NewWriter(db.config.State.Origin(), db.config.State.VolatileSize()) @@ -912,8 +886,6 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error } defer storeWriter.Close() - volatileDeallocator := db.config.State.NewVolatileDeallocator() - for processedCount := uint64(0); ; processedCount++ { req, err := pipeReader.Read(ctx) if err != nil { @@ -925,7 +897,6 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error if err := storeWriter.Write(lr.List[i].PersistentAddress, lr.List[i].VolatileAddress); err != nil { return err } - volatileDeallocator.Deallocate(lr.List[i].VolatileAddress) } } @@ -952,40 +923,35 @@ func (db *DB) storeNodes(ctx context.Context, pipeReader *pipeline.Reader) error func (db *DB) deallocateNode( nodeSnapshotID types.SnapshotID, nodeAddress types.PersistentAddress, + deallocationListsToCommit map[types.SnapshotID]*list.Pointer, volatileAllocator *alloc.Allocator[types.VolatileAddress], persistentAllocator *alloc.Allocator[types.PersistentAddress], persistentDeallocator *alloc.Deallocator[types.PersistentAddress], - immediateDeallocation bool, ) (list.Pointer, error) { - if nodeSnapshotID > db.snapshotInfo.PreviousSnapshotID || immediateDeallocation { + // Latest persistent snapshot cannot be deleted, so there is no gap between that snapshot and the pending one. + // It means the condition here don't need to include snapshot IDs greater than the previous snapshot ID. + if nodeSnapshotID == db.singularityNode.LastSnapshotID { persistentDeallocator.Deallocate(nodeAddress) return list.Pointer{}, nil } - listRoot := db.deallocationListsToCommit[nodeSnapshotID] + listRoot := deallocationListsToCommit[nodeSnapshotID] if listRoot == nil { listRoot = &list.Pointer{} - db.deallocationListsToCommit[nodeSnapshotID] = listRoot + deallocationListsToCommit[nodeSnapshotID] = listRoot } return list.Add(listRoot, nodeAddress, db.config.State, volatileAllocator, persistentAllocator) } -func (db *DB) prepareNextSnapshot() error { - db.snapshotInfo.PreviousSnapshotID = db.singularityNode.LastSnapshotID +func (db *DB) prepareNextSnapshot() { db.singularityNode.LastSnapshotID++ - db.snapshotInfo.NextSnapshotID = db.singularityNode.LastSnapshotID + 1 - db.snapshotInfo.DeallocationRoot = types.Pointer{} - - clear(db.deallocationListsToCommit) - - return nil } // GetSpace retrieves space from snapshot. func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V], error) { - if spaceID >= types.SpaceID(len(db.snapshotInfo.Spaces)) { + if spaceID >= types.NumOfSpaces { return nil, errors.Errorf("space %d is not defined", spaceID) } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index c757dc8..e65f9e2 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -81,8 +81,8 @@ func (t *TransactionRequest) AddListRequest(lr *ListRequest) { type StoreRequest struct { NoSnapshots bool PointersToStore int8 - PointersLast int8 Store [StoreCapacity]types.NodeRoot + PointersLast int8 Next *StoreRequest } diff --git a/space/space.go b/space/space.go index d9c500b..c791f5a 100644 --- a/space/space.go +++ b/space/space.go @@ -416,6 +416,7 @@ func (s *Space[K, V]) splitDataNodeWithoutConflict( continue } + // FIXME (wojciech): Items might be inserted on the first free slot in the new node. newDataNodeItem := s.config.DataNodeAssistant.Item(newDataNode, s.config.DataNodeAssistant.ItemOffset(i)) *newDataNodeItem = *item newKeyHashes[i] = keyHashes[i] @@ -483,6 +484,7 @@ func (s *Space[K, V]) splitDataNodeWithConflict( continue } + // FIXME (wojciech): Items might be inserted on the first free slot in the new node. newDataNodeItem := s.config.DataNodeAssistant.Item(newDataNode, s.config.DataNodeAssistant.ItemOffset(i)) *newDataNodeItem = *item newKeyHashes[i] = keyHash @@ -622,8 +624,7 @@ func (s *Space[K, V]) walkDataItems(v *Entry[K, V], hashMatches []uint64) bool { var conflict bool node := s.config.State.Node(types.Load( - &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress), - ) + &v.storeRequest.Store[v.storeRequest.PointersToStore-1].Pointer.VolatileAddress)) keyHashes := s.config.DataNodeAssistant.KeyHashes(node) zeroIndex, numOfMatches := compare.Compare(uint64(v.keyHash), (*uint64)(&keyHashes[0]), &hashMatches[0], diff --git a/types/types.go b/types/types.go index 42a883a..33c92f5 100644 --- a/types/types.go +++ b/types/types.go @@ -143,7 +143,6 @@ type SnapshotInfo struct { // SingularityNode is the root of the store. type SingularityNode struct { - Hash Hash LastSnapshotID SnapshotID SnapshotRoot Pointer }