Skip to content

Commit

Permalink
Store preallocated pointer nodes in persistently (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Oct 21, 2024
1 parent ae156d1 commit f542a40
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 50 deletions.
6 changes: 6 additions & 0 deletions alloc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"

"github.com/outofforest/photon"
"github.com/outofforest/quantum/types"
)

Expand Down Expand Up @@ -77,6 +78,11 @@ func (s *State) Node(nodeAddress types.LogicalAddress) unsafe.Pointer {
return unsafe.Add(s.dataP, nodeAddress)
}

// Bytes returns byte slice of a node.
func (s *State) Bytes(nodeAddress types.LogicalAddress) []byte {
return photon.SliceFromPointer[byte](s.Node(nodeAddress), int(s.nodeSize))
}

// Run runs node eraser.
func (s *State) Run(ctx context.Context) error {
return RunEraser(ctx, s.deallocationCh, s.allocationCh, s.nodeSize, s, s.numOfEraseWorkers)
Expand Down
11 changes: 10 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/outofforest/logger"
"github.com/outofforest/parallel"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/types"
)

Expand All @@ -37,8 +38,9 @@ func BenchmarkBalanceTransfer(b *testing.B) {

for bi := 0; bi < b.N; bi++ {
func() {
var size uint64 = 20 * 1024 * 1024 * 1024
state, stateDeallocFunc, err := alloc.NewState(
70*1024*1024*1024,
size,
4*1024,
1024,
true,
Expand All @@ -51,8 +53,15 @@ func BenchmarkBalanceTransfer(b *testing.B) {

pool := state.NewPool()

store, storeDeallocFunc, err := persistent.NewMemoryStore(size, true)
if err != nil {
panic(err)
}
defer storeDeallocFunc()

db, err := New(Config{
State: state,
Store: store,
})
if err != nil {
panic(err)
Expand Down
128 changes: 103 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quantum
import (
"context"
"sort"
"sync/atomic"

"github.com/pkg/errors"
"github.com/samber/lo"
Expand All @@ -12,13 +13,15 @@ import (
"github.com/outofforest/photon"
"github.com/outofforest/quantum/alloc"
"github.com/outofforest/quantum/list"
"github.com/outofforest/quantum/persistent"
"github.com/outofforest/quantum/space"
"github.com/outofforest/quantum/types"
)

// Config stores snapshot configuration.
type Config struct {
State *alloc.State
Store persistent.Store
}

// SpaceToCommit represents requested space which might require to be committed.
Expand Down Expand Up @@ -94,7 +97,7 @@ func New(config Config) (*DB, error) {
spacesToCommit: map[types.SpaceID]SpaceToCommit{},
deallocationListsToCommit: map[types.SnapshotID]ListToCommit{},
availableSnapshots: map[types.SnapshotID]struct{}{},
storageEventCh: make(chan any, 100),
eventCh: make(chan any, 100),
doneCh: make(chan struct{}),
}

Expand All @@ -108,7 +111,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: snapshotInfoNodeAllocator,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.SnapshotInfo]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})

db.spaces = space.New[types.SpaceID, types.SpaceInfo](space.Config[types.SpaceID, types.SpaceInfo]{
Expand All @@ -121,7 +124,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: spaceInfoNodeAllocator,
MassEntry: mass.New[space.Entry[types.SpaceID, types.SpaceInfo]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})

db.deallocationLists = space.New[types.SnapshotID, types.Pointer](
Expand All @@ -135,7 +138,7 @@ func New(config Config) (*DB, error) {
PointerNodeAllocator: pointerNodeAllocator,
DataNodeAllocator: snapshotToNodeNodeAllocator,
MassEntry: mass.New[space.Entry[types.SnapshotID, types.Pointer]](1000),
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
},
)

Expand Down Expand Up @@ -172,8 +175,8 @@ type DB struct {
deallocationListsToCommit map[types.SnapshotID]ListToCommit
availableSnapshots map[types.SnapshotID]struct{}

storageEventCh chan any
doneCh chan struct{}
eventCh chan any
doneCh chan struct{}
}

// DeleteSnapshot deletes snapshot.
Expand Down Expand Up @@ -211,7 +214,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
PointerNodeAllocator: db.pointerNodeAllocator,
DataNodeAllocator: db.snapshotToNodeNodeAllocator,
MassEntry: db.massSnapshotToNodeEntry,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
},
)
} else {
Expand All @@ -232,7 +235,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
continue
}

db.storageEventCh <- types.ListDeallocationEvent{
db.eventCh <- types.ListDeallocationEvent{
ListRoot: snapshotItem.Value,
}

Expand Down Expand Up @@ -265,7 +268,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
ListRoot: &newNextListNodeAddress,
State: db.config.State,
NodeAllocator: db.listNodeAllocator,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
})
if err != nil {
return err
Expand All @@ -285,7 +288,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
}
}

db.storageEventCh <- types.SpaceDeallocationEvent{
db.eventCh <- types.SpaceDeallocationEvent{
SpaceRoot: deallocationListsRoot,
}

Expand Down Expand Up @@ -340,7 +343,7 @@ func (db *DB) DeleteSnapshot(snapshotID types.SnapshotID, pool *alloc.Pool[types
// Commit commits current snapshot and returns next one.
func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
doneCh := make(chan struct{})
db.storageEventCh <- types.DBCommitEvent{
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
}
<-doneCh
Expand Down Expand Up @@ -383,7 +386,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {
}

doneCh = make(chan struct{})
db.storageEventCh <- types.DBCommitEvent{
db.eventCh <- types.DBCommitEvent{
DoneCh: doneCh,
}
<-doneCh
Expand All @@ -397,7 +400,7 @@ func (db *DB) Commit(pool *alloc.Pool[types.LogicalAddress]) error {

// Close closed DB.
func (db *DB) Close() {
close(db.storageEventCh)
close(db.eventCh)
db.config.State.Close()

<-db.doneCh
Expand All @@ -408,33 +411,76 @@ func (db *DB) Run(ctx context.Context) error {
defer close(db.doneCh)

return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("state", parallel.Fail, db.config.State.Run)
spawn("storageEvents", parallel.Continue, func(ctx context.Context) error {
return db.processStorageEvents(ctx, db.storageEventCh)
storeRequestCh := make(chan types.StoreRequest, 500)

spawn("state", parallel.Continue, db.config.State.Run)
spawn("events", parallel.Continue, func(ctx context.Context) error {
defer close(storeRequestCh)
return db.processEvents(ctx, db.eventCh, storeRequestCh)
})
spawn("storeRequests", parallel.Continue, func(ctx context.Context) error {
return db.processStoreRequests(ctx, storeRequestCh)
})

return nil
})
}

func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan any) error {
func (db *DB) processEvents(
ctx context.Context,
eventCh <-chan any,
storeRequestCh chan<- types.StoreRequest,
) error {
volatilePool := db.config.State.NewPool()
persistentPool := alloc.NewPool[types.PhysicalAddress](db.persistentAllocationCh, db.persistentAllocationCh)

pointerNode := db.pointerNodeAllocator.NewNode()
parentPointerNode := db.pointerNodeAllocator.NewNode()
listNode := db.listNodeAllocator.NewNode()

for event := range storageEventCh {
for event := range eventCh {
switch e := event.(type) {
case types.SpacePointerNodeAllocatedEvent:
pointerNodeAddress := e.Pointer.LogicalAddress
for pointerNodeAddress != 0 {
header := photon.FromPointer[space.PointerNodeHeader](db.config.State.Node(pointerNodeAddress))
if header.RevisionHeader.SnapshotID == db.singularityNode.LastSnapshotID {
db.pointerNodeAllocator.Get(e.NodeAddress, pointerNode)

for {
var pointer *types.Pointer
if pointerNode.Header.ParentNodeAddress == 0 {
pointer = e.RootPointer
} else {
db.pointerNodeAllocator.Get(pointerNode.Header.ParentNodeAddress, parentPointerNode)
spacePointer, _ := parentPointerNode.Item(pointerNode.Header.ParentNodeIndex)
pointer = &spacePointer.Pointer
}

revision := atomic.AddUint64(&pointerNode.Header.RevisionHeader.Revision, 1)

if pointerNode.Header.RevisionHeader.SnapshotID != db.singularityNode.LastSnapshotID {
pointerNode.Header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID
pointer.PhysicalAddress = 0
}

terminate := true
if pointer.PhysicalAddress == 0 {
var err error
pointer.PhysicalAddress, err = persistentPool.Allocate()
if err != nil {
return err
}

terminate = pointer == e.RootPointer
}

storeRequestCh <- types.StoreRequest{
Revision: revision,
Pointer: pointer,
}

if terminate {
break
}
header.RevisionHeader.SnapshotID = db.singularityNode.LastSnapshotID
pointerNodeAddress = header.ParentNodeAddress

pointerNode = parentPointerNode
}
case types.SpaceDataNodeAllocatedEvent:
header := photon.FromPointer[space.DataNodeHeader](db.config.State.Node(e.Pointer.LogicalAddress))
Expand Down Expand Up @@ -492,12 +538,44 @@ func (db *DB) processStorageEvents(ctx context.Context, storageEventCh <-chan an
listNode,
)
case types.DBCommitEvent:
ch := make(chan struct{}, 1)
storeRequestCh <- types.StoreRequest{
DoneCh: ch,
}
<-ch
close(e.DoneCh)
}
}
return errors.WithStack(ctx.Err())
}

func (db *DB) processStoreRequests(ctx context.Context, storeRequestCh <-chan types.StoreRequest) error {
for req := range storeRequestCh {
if req.Pointer != nil {
header := photon.FromPointer[types.RevisionHeader](db.config.State.Node(req.Pointer.LogicalAddress))
revision := atomic.LoadUint64(&header.Revision)

if revision != req.Revision {
continue
}

if err := db.config.Store.Write(
req.Pointer.PhysicalAddress,
db.config.State.Bytes(req.Pointer.LogicalAddress),
); err != nil {
return err
}

// TODO: Store node

continue
}

req.DoneCh <- struct{}{}
}
return errors.WithStack(ctx.Err())
}

func (db *DB) prepareNextSnapshot() error {
var snapshotID types.SnapshotID
if db.singularityNode.SnapshotRoot.State != types.StateFree {
Expand Down Expand Up @@ -564,7 +642,7 @@ func GetSpace[K, V comparable](spaceID types.SpaceID, db *DB) (*space.Space[K, V
HashMod: s.HashMod,
SpaceRoot: s.PInfo,
State: db.config.State,
StorageEventCh: db.storageEventCh,
StorageEventCh: db.eventCh,
PointerNodeAllocator: db.pointerNodeAllocator,
DataNodeAllocator: dataNodeAllocator,
MassEntry: mass.New[space.Entry[K, V]](1000),
Expand Down
11 changes: 4 additions & 7 deletions persistent/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"syscall"

"github.com/pkg/errors"

"github.com/outofforest/quantum/types"
)

// NewMemoryStore creates new in-memory "persistent" store.
Expand All @@ -29,13 +31,8 @@ type MemoryStore struct {
data []byte
}

// Size returns size of the store.
func (s *MemoryStore) Size() uint64 {
return uint64(len(s.data))
}

// Write writes data to the store.
func (s *MemoryStore) Write(offset uint64, data []byte) error {
copy(s.data[offset:], data)
func (s *MemoryStore) Write(address types.PhysicalAddress, data []byte) error {
copy(s.data[address:], data)
return nil
}
5 changes: 3 additions & 2 deletions persistent/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package persistent

import "github.com/outofforest/quantum/types"

// Store defines the interface of the store.
type Store interface {
Size() uint64
Write(offset uint64, data []byte) error
Write(address types.PhysicalAddress, data []byte) error
}
11 changes: 8 additions & 3 deletions space/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (na *NodeAllocator[H, T]) Allocate(
return nodeAddress, nil
}

// Index returns index from hash.
func (na *NodeAllocator[H, T]) Index(hash types.Hash) uintptr {
return uintptr(hash) % na.numOfItems
}

// Shift shifts bits in hash.
func (na *NodeAllocator[H, T]) Shift(hash types.Hash) types.Hash {
return hash / types.Hash(na.numOfItems)
Expand All @@ -100,6 +105,7 @@ func (na *NodeAllocator[H, T]) project(nodeP unsafe.Pointer, node *Node[H, T]) {
type PointerNodeHeader struct {
RevisionHeader types.RevisionHeader
ParentNodeAddress types.LogicalAddress
ParentNodeIndex uintptr
HashMod uint64
}

Expand All @@ -118,9 +124,8 @@ type Node[H, T comparable] struct {
itemsP unsafe.Pointer
}

// ItemByHash returns pointers to the item and its state by hash.
func (sn *Node[H, T]) ItemByHash(hash types.Hash) (*T, *types.State) {
index := uintptr(hash) % sn.numOfItems
// Item returns pointers to the item and its state by index.
func (sn *Node[H, T]) Item(index uintptr) (*T, *types.State) {
return (*T)(unsafe.Add(sn.itemsP, sn.itemSize*index)), (*types.State)(unsafe.Add(sn.statesP, index))
}

Expand Down
Loading

0 comments on commit f542a40

Please sign in to comment.