diff --git a/alloc/state.go b/alloc/state.go index d591210..4102813 100644 --- a/alloc/state.go +++ b/alloc/state.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" + "github.com/outofforest/photon" "github.com/outofforest/quantum/types" ) @@ -77,6 +78,11 @@ func (s *State) Node(nodeAddress types.LogicalAddress) unsafe.Pointer { return unsafe.Add(s.dataP, nodeAddress) } +// Bytes returns byte slice of a node. +func (s *State) Bytes(nodeAddress types.LogicalAddress) []byte { + return photon.SliceFromPointer[byte](s.Node(nodeAddress), int(s.nodeSize)) +} + // Run runs node eraser. func (s *State) Run(ctx context.Context) error { return RunEraser(ctx, s.deallocationCh, s.allocationCh, s.nodeSize, s, s.numOfEraseWorkers) diff --git a/benchmark_test.go b/benchmark_test.go index 29e9b27..f3bb277 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -11,6 +11,7 @@ import ( "github.com/outofforest/logger" "github.com/outofforest/parallel" "github.com/outofforest/quantum/alloc" + "github.com/outofforest/quantum/persistent" "github.com/outofforest/quantum/types" ) @@ -37,8 +38,9 @@ func BenchmarkBalanceTransfer(b *testing.B) { for bi := 0; bi < b.N; bi++ { func() { + var size uint64 = 20 * 1024 * 1024 * 1024 state, stateDeallocFunc, err := alloc.NewState( - 70*1024*1024*1024, + size, 4*1024, 1024, true, @@ -51,8 +53,15 @@ func BenchmarkBalanceTransfer(b *testing.B) { pool := state.NewPool() + store, storeDeallocFunc, err := persistent.NewMemoryStore(size, true) + if err != nil { + panic(err) + } + defer storeDeallocFunc() + db, err := New(Config{ State: state, + Store: store, }) if err != nil { panic(err) diff --git a/db.go b/db.go index 273efec..3f19ed3 100644 --- a/db.go +++ b/db.go @@ -3,6 +3,7 @@ package quantum import ( "context" "sort" + "sync/atomic" "github.com/pkg/errors" "github.com/samber/lo" @@ -12,6 +13,7 @@ import ( "github.com/outofforest/photon" "github.com/outofforest/quantum/alloc" "github.com/outofforest/quantum/list" + "github.com/outofforest/quantum/persistent" "github.com/outofforest/quantum/space" "github.com/outofforest/quantum/types" ) @@ -19,6 +21,7 @@ import ( // Config stores snapshot configuration. type Config struct { State *alloc.State + Store persistent.Store } // SpaceToCommit represents requested space which might require to be committed. @@ -94,7 +97,7 @@ func New(config Config) (*DB, error) { spacesToCommit: map[types.SpaceID]SpaceToCommit{}, deallocationListsToCommit: map[types.SnapshotID]ListToCommit{}, availableSnapshots: map[types.SnapshotID]struct{}{}, - storageEventCh: make(chan any, 100), + eventCh: make(chan any, 100), doneCh: make(chan struct{}), } @@ -108,7 +111,7 @@ func New(config Config) (*DB, error) { PointerNodeAllocator: pointerNodeAllocator, DataNodeAllocator: snapshotInfoNodeAllocator, MassEntry: mass.New[space.Entry[types.SnapshotID, types.SnapshotInfo]](1000), - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, }) db.spaces = space.New[types.SpaceID, types.SpaceInfo](space.Config[types.SpaceID, types.SpaceInfo]{ @@ -121,7 +124,7 @@ func New(config Config) (*DB, error) { PointerNodeAllocator: pointerNodeAllocator, DataNodeAllocator: spaceInfoNodeAllocator, MassEntry: mass.New[space.Entry[types.SpaceID, types.SpaceInfo]](1000), - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, }) db.deallocationLists = space.New[types.SnapshotID, types.Pointer]( @@ -135,7 +138,7 @@ func New(config Config) (*DB, error) { PointerNodeAllocator: pointerNodeAllocator, DataNodeAllocator: snapshotToNodeNodeAllocator, MassEntry: mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000), - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, }, ) @@ -172,8 +175,8 @@ type DB struct { deallocationListsToCommit map[types.SnapshotID]ListToCommit availableSnapshots map[types.SnapshotID]struct{} - storageEventCh chan any - doneCh chan struct{} + eventCh chan any + doneCh chan struct{} } // DeleteSnapshot deletes snapshot. @@ -211,7 +214,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types PointerNodeAllocator: db.pointerNodeAllocator, DataNodeAllocator: db.snapshotToNodeNodeAllocator, MassEntry: db.massSnapshotToNodeEntry, - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, }, ) } else { @@ -232,7 +235,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types continue } - db.storageEventCh <- types.ListDeallocationEvent{ + db.eventCh <- types.ListDeallocationEvent{ ListRoot: snapshotItem.Value, } @@ -265,7 +268,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types ListRoot: &newNextListNodeAddress, State: db.config.State, NodeAllocator: db.listNodeAllocator, - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, }) if err != nil { return err @@ -285,7 +288,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types } } - db.storageEventCh <- types.SpaceDeallocationEvent{ + db.eventCh <- types.SpaceDeallocationEvent{ SpaceRoot: deallocationListsRoot, } @@ -340,7 +343,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types // Commit commits current snapshot and returns next one. func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error { doneCh := make(chan struct{}) - db.storageEventCh <- types.DBCommitEvent{ + db.eventCh <- types.DBCommitEvent{ DoneCh: doneCh, } <-doneCh @@ -383,7 +386,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error { } doneCh = make(chan struct{}) - db.storageEventCh <- types.DBCommitEvent{ + db.eventCh <- types.DBCommitEvent{ DoneCh: doneCh, } <-doneCh @@ -397,7 +400,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error { // Close closed DB. func (db *DB) Close() { - close(db.storageEventCh) + close(db.eventCh) db.config.State.Close() <-db.doneCh @@ -408,33 +411,76 @@ func (db *DB) Run(ctx context.Context) error { defer close(db.doneCh) return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { - spawn("state", parallel.Fail, db.config.State.Run) - spawn("storageEvents", parallel.Continue, func(ctx context.Context) error { - return db.processStorageEvents(ctx, db.storageEventCh) + storeRequestCh := make(chan types.StoreRequest, 500) + + spawn("state", parallel.Continue, db.config.State.Run) + spawn("events", parallel.Continue, func(ctx context.Context) error { + defer close(storeRequestCh) + return db.processEvents(ctx, db.eventCh, storeRequestCh) + }) + spawn("storeRequests", parallel.Continue, func(ctx context.Context) error { + return db.processStoreRequests(ctx, storeRequestCh) }) return nil }) } -func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan any) error { +func (db *DB) processEvents( + ctx context.Context, + eventCh <-chan any, + storeRequestCh chan<- types.StoreRequest, +) error { volatilePool := db.config.State.NewPool() persistentPool := alloc.NewPool[types.PhysicalAddress](db.persistentAllocationCh, db.persistentAllocationCh) pointerNode := db.pointerNodeAllocator.NewNode() + parentPointerNode := db.pointerNodeAllocator.NewNode() listNode := db.listNodeAllocator.NewNode() - for event := range storageEventCh { + for event := range eventCh { switch e := event.(type) { case types.SpacePointerNodeAllocatedEvent: - pointerNodeAddress := e.Pointer.LogicalAddress - for pointerNodeAddress != 0 { - header := photon.FromPointer[space.PointerNodeHeader](db.config.State.Node(pointerNodeAddress)) - if header.RevisionHeader.SnapshotID == db.singularityNode.LastSnapshotID { + db.pointerNodeAllocator.Get(e.NodeAddress, pointerNode) + + for { + var pointer *types.Pointer + if pointerNode.Header.ParentNodeAddress == 0 { + pointer = e.RootPointer + } else { + db.pointerNodeAllocator.Get(pointerNode.Header.ParentNodeAddress, parentPointerNode) + spacePointer, _ := parentPointerNode.Item(pointerNode.Header.ParentNodeIndex) + pointer = &spacePointer.Pointer + } + + revision := atomic.AddUint64(&pointerNode.Header.RevisionHeader.Revision, 1) + + if pointerNode.Header.RevisionHeader.SnapshotID != db.singularityNode.LastSnapshotID { + pointerNode.Header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID + pointer.PhysicalAddress = 0 + } + + terminate := true + if pointer.PhysicalAddress == 0 { + var err error + pointer.PhysicalAddress, err = persistentPool.Allocate() + if err != nil { + return err + } + + terminate = pointer == e.RootPointer + } + + storeRequestCh <- types.StoreRequest{ + Revision: revision, + Pointer: pointer, + } + + if terminate { break } - header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID - pointerNodeAddress = header.ParentNodeAddress + + pointerNode = parentPointerNode } case types.SpaceDataNodeAllocatedEvent: header := photon.FromPointer[space.DataNodeHeader](db.config.State.Node(e.Pointer.LogicalAddress)) @@ -492,12 +538,44 @@ func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan an listNode, ) case types.DBCommitEvent: + ch := make(chan struct{}, 1) + storeRequestCh <- types.StoreRequest{ + DoneCh: ch, + } + <-ch close(e.DoneCh) } } return errors.WithStack(ctx.Err()) } +func (db *DB) processStoreRequests(ctx context.Context, storeRequestCh <-chan types.StoreRequest) error { + for req := range storeRequestCh { + if req.Pointer != nil { + header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress)) + revision := atomic.LoadUint64(&header.Revision) + + if revision != req.Revision { + continue + } + + if err := db.config.Store.Write( + req.Pointer.PhysicalAddress, + db.config.State.Bytes(req.Pointer.LogicalAddress), + ); err != nil { + return err + } + + // TODO: Store node + + continue + } + + req.DoneCh <- struct{}{} + } + return errors.WithStack(ctx.Err()) +} + func (db *DB) prepareNextSnapshot() error { var snapshotID types.SnapshotID if db.singularityNode.SnapshotRoot.State != types.StateFree { @@ -564,7 +642,7 @@ func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V HashMod: s.HashMod, SpaceRoot: s.PInfo, State: db.config.State, - StorageEventCh: db.storageEventCh, + StorageEventCh: db.eventCh, PointerNodeAllocator: db.pointerNodeAllocator, DataNodeAllocator: dataNodeAllocator, MassEntry: mass.New[space.Entry[K, V]](1000), diff --git a/persistent/memory.go b/persistent/memory.go index 4bbd2b7..1fe184a 100644 --- a/persistent/memory.go +++ b/persistent/memory.go @@ -4,6 +4,8 @@ import ( "syscall" "github.com/pkg/errors" + + "github.com/outofforest/quantum/types" ) // NewMemoryStore creates new in-memory "persistent" store. @@ -29,13 +31,8 @@ type MemoryStore struct { data []byte } -// Size returns size of the store. -func (s *MemoryStore) Size() uint64 { - return uint64(len(s.data)) -} - // Write writes data to the store. -func (s *MemoryStore) Write(offset uint64, data []byte) error { - copy(s.data[offset:], data) +func (s *MemoryStore) Write(address types.PhysicalAddress, data []byte) error { + copy(s.data[address:], data) return nil } diff --git a/persistent/types.go b/persistent/types.go index 4ab9a3c..ee71c5e 100644 --- a/persistent/types.go +++ b/persistent/types.go @@ -1,7 +1,8 @@ package persistent +import "github.com/outofforest/quantum/types" + // Store defines the interface of the store. type Store interface { - Size() uint64 - Write(offset uint64, data []byte) error + Write(address types.PhysicalAddress, data []byte) error } diff --git a/space/alloc.go b/space/alloc.go index c24b2bb..c9d2040 100644 --- a/space/alloc.go +++ b/space/alloc.go @@ -85,6 +85,11 @@ func (na *NodeAllocator[H, T]) Allocate( return nodeAddress, nil } +// Index returns index from hash. +func (na *NodeAllocator[H, T]) Index(hash types.Hash) uintptr { + return uintptr(hash) % na.numOfItems +} + // Shift shifts bits in hash. func (na *NodeAllocator[H, T]) Shift(hash types.Hash) types.Hash { return hash / types.Hash(na.numOfItems) @@ -100,6 +105,7 @@ func (na *NodeAllocator[H, T]) project(nodeP unsafe.Pointer, node *Node[H, T]) { type PointerNodeHeader struct { RevisionHeader types.RevisionHeader ParentNodeAddress types.LogicalAddress + ParentNodeIndex uintptr HashMod uint64 } @@ -118,9 +124,8 @@ type Node[H, T comparable] struct { itemsP unsafe.Pointer } -// ItemByHash returns pointers to the item and its state by hash. -func (sn *Node[H, T]) ItemByHash(hash types.Hash) (*T, *types.State) { - index := uintptr(hash) % sn.numOfItems +// Item returns pointers to the item and its state by index. +func (sn *Node[H, T]) Item(index uintptr) (*T, *types.State) { return (*T)(unsafe.Add(sn.itemsP, sn.itemSize*index)), (*types.State)(unsafe.Add(sn.statesP, index)) } diff --git a/space/space.go b/space/space.go index ba27665..20d4667 100644 --- a/space/space.go +++ b/space/space.go @@ -115,6 +115,7 @@ type pointerToAllocate struct { Level uint64 PEntry types.ParentEntry PAddress types.LogicalAddress + PIndex uintptr } // AllocatePointers allocates specified levels of pointer nodes. @@ -150,19 +151,22 @@ func (s *Space[K, V]) AllocatePointers( return err } pointerNode.Header.ParentNodeAddress = pToAllocate.PAddress + pointerNode.Header.ParentNodeIndex = pToAllocate.PIndex *pToAllocate.PEntry.State = types.StatePointer pToAllocate.PEntry.SpacePointer.Pointer.LogicalAddress = pointerNodeAddress - s.config.StorageEventCh <- types.SpacePointerNodeAllocatedEvent{ - Pointer: &pToAllocate.PEntry.SpacePointer.Pointer, - } - if pToAllocate.Level == levels { + s.config.StorageEventCh <- types.SpacePointerNodeAllocatedEvent{ + NodeAddress: pointerNodeAddress, + RootPointer: &s.config.SpaceRoot.SpacePointer.Pointer, + } + continue } pToAllocate.Level++ + var index uintptr for item, state := range pointerNode.Iterator() { stack = append(stack, pointerToAllocate{ Level: pToAllocate.Level, @@ -171,7 +175,10 @@ func (s *Space[K, V]) AllocatePointers( SpacePointer: item, }, PAddress: pointerNodeAddress, + PIndex: index, }) + + index++ } } } @@ -330,7 +337,7 @@ func (s *Space[K, V]) set( *v.pEntry.State = types.StateData v.pEntry.SpacePointer.Pointer.LogicalAddress = dataNodeAddress - item, state := dataNode.ItemByHash(v.item.Hash + 1) + item, state := dataNode.Item(s.config.DataNodeAllocator.Index(v.item.Hash + 1)) *state = types.StateData *item = v.item @@ -349,7 +356,7 @@ func (s *Space[K, V]) set( var conflict bool for i := types.Hash(0); i < trials; i++ { - item, state := dataNode.ItemByHash(v.item.Hash + 1<